Сервер будемо писати на python , використовуючи фреймворк Tornado . Як транспорт для повідомлень буде використаний сервер Redis .
Встановлення сервера Redis:
sudo apt-get install redis-server
Установка Tornado:
pip install tornado==4.5.3 redis tornado-redis
Заготівля сервера, файл server.py.
#!/usr/bin/env python
import tornado.ioloop
import tornado.web
# шаблонізатор
loader = tornado.template.Loader(".")
# утиліта навантаження сервера при змінах коду
від tornado import autoreload
# Обробник вебсокетів
from tornado.websocket import WebSocketHandler
import json
import datetime
import time
import hashlib
import tornadoredis
# Створюємо клієнта Redis у глобальній змінній
# для надсилання повідомлень безпосередньо до каналу з усіх місць
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# віддає головну сторінку
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.render("index.html")
# Глобальний словник для зберігання поточних з'єднань
ws_clients = {}
class WebsocketHandler(tornado.websocket.WebSocketHandler):
# Передплата на канал (асинхронно)
@tornado.gen.coroutine
def listen_redis(self, channel_id):
self.client = tornadoredis.Client()
self.client.connect()
yield tornado.gen.Task(self.client.subscribe, channel_id)
self.client.listen(self.redis_message)
# обробник надходження повідомлень з редису
def redis_message(self,message):
if(message.kind != 'subscribe'):
data = message.body
print(data)
# обробник відкриття з'єднання
def open(self):
print('Open connection')
# генеруємо унікальний ідентифікатор клієнта з таймстампу
sign = hashlib.md5(str(datetime.datetime.now()).encode('utf-8')).hexdigest()
self.client_id = sign
# додавання нового з'єднання до глобального словника
ws_clients[sign] = {}
ws_clients[sign]['connection'] = self
ws_clients[sign]['username'] = 'undefined'
# Передплата каналу Redis
self.listen_redis(self.client_id)
# Обробник надходження повідомлення з клієнта по вебсокету
def on_message(self, message):
print('got message %s' % message)
# обробник закриття з'єднання клієнтом
def on_close(self):
print('close connection')
видалення з'єднання з глобального словника
del ws_clients[self.client_id]
# конфігуруємо додаток роутингом
def make_app():
return tornado.web.Application([
# Головна сторінка
(r"/", MainHandler),
# віддача статики
(r'/static/(.*)', tornado.web.StaticFileHandler, {'path': 'static'}),
# запити по веб-сокету
(r"/websocket", WebsocketHandler),
])
if __name__ == "__main__":
print('Starting server on 8888 port')
autoreload.start()
autoreload.watch('.')
autoreload.watch('index.html')
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
HTML форма логіну.
В елементі socketId зберігатимемо ідентифікатор з'єднання на сервері.
Авторизаційний запит
У ньому ми заберемо ім'я користувача та передамо його на сервер по веб-сокету.
// ws
var ws = new WebSocket("ws://localhost:8888/websocket");
// Login
const loginButton = document.getElementById('loginButton');
const username = document.getElementById('username');
loginButton.addEventListener('click', login);
function login(){
console.log('Logining');
let message = {'action': 'login', 'message': username.value}
ws.send(JSON.stringify(message));
}
Відпрацюємо на сервері процес авторизації користувача функції on_message.
def on_message(self, message):
print('got message %s' % message)
message = json.loads(message)
if message['action'] == 'login':
print('Login with name %s' % message['message'])
ws_clients[self.client_id]['username'] = message['message']
# надсилаємо клієнту його ідентифікатор з'єднання на сервері
self.write_message(\
{\
'action': 'set_connection_id',\
'message': self.client_id\
}\
)
print(ws_clients[self.client_id])
Ми додали логін користувача до словника, у якому перебувають усі «живі» з'єднання і віддали клієнту його внутрішній ідентифікатор сервера.
Додамо цей ідентифікатор на форму сторінки, визначивши обробник onmessage об'єкт ws.
ws.onmessage = function (evt) {
let jdata = JSON.parse(evt.data);
if( jdata['action'] == 'set_connection_id'){
console.log(`Set sign $`);
socketId.value = jdata['message'];
}
}
Результат праці:
Опишемо 3 основні сигнальні функції серверної частини:
- offer – запит на підключення
- answer – відповідь на запит віддаленою точкою
- candidate – обмін ICE кандидатами
if message['action'] == 'offer':
print("Sending offer to: %s" % message['destination'])
message['initiator_id'] = self.client_id
redis_client.publish(\
message['destination'],\
json.dumps(message)\
)
if message['action'] == 'answer':
print("Sending answer to: %s" % message['destination'])
redis_client.publish(\
message['destination'],\
json.dumps(message)\
)
if message['action'] == 'candidate':
print("Sending ICE candidate to: %s" % message['destination'])
redis_client.publish(\
message['destination'],\
json.dumps(message)\
)
Всі функції однотипні і просто надсилають повідомлення в канал Redis з ідентифікатором, вказаним у параметрі destination повідомлення.
Залишилося відпрацювати ці функції за клієнта. Але перш, необхідно отримати на клієнті список онлайн користувачів, щоб мати можливість вибрати одного для з'єднання.
Додамо наступну функцію на сервері, яка формує цей список і віддає його всім онлайн-з'єднанням:
def send_cliens_to_all():
clnts = []
# сформуємо список з'єднань
for key, value in ws_clients.items():
clnts.append({
'id': key,
'username': value['username']
})
# відправимо список у кожне з'єднання
for key, value in ws_clients.items():
ws_clients[key]['connection'].write_message(\
{\
'action': 'update_clients',\
'message': json.dumps(clnts)\
}\
)
Тепер викличемо цю функцію при логіні та завершенні з'єднання.
def on_message(self, message):
...
send_cliens_to_all()
...
# обробник закриття з'єднання клієнтом
def on_close(self):
print('close connection')
видалення з'єднання з глобального словника
del ws_clients[self.client_id]
send_cliens_to_all()
Додамо елемент ul в код шаблону, де буде наш список користувачів онлайн.
Будемо його оновлювати при надходженні повідомлення з параметром action рівним 'update_clients' :
ws.onmessage = function (evt) {
let jdata = JSON.parse(evt.data);
...
if( jdata['action'] == 'update_clients'){
console.log(`Updating clients $`);
clientList.empty();
let data = JSON.parse(jdata['message']);
data.forEach(function (item, index) {
clientList.find(`#$`).on('click',call);
});
Ми проходимо циклом за списком активних користувачів і до кожного елемента додаємо посилання для виклику, на яку вішаємо функцію-обробник call.
Розберемо процес взаємодії між користувачами для встановлення передачі одного відеоаудіо потоку на прикладі наступної UML діаграми послідовностей.
В результаті такої взаємодії, ми отримуємо два об'єкти RTCPeerConnection по одному на кожну точку peer. У кожному з'єднанні встановлено два SDP об'єкти в Local та Remote параметрах, кожне з яких містить список ICE кандидатів від віддаленого та локального хоста. Дані SDP та ICE передаються між клієнтами за допомогою сигнального сервера у вказаній на діаграмі послідовності.
Наш код буде розділено на 4 частини:
- Ініціалізація.
- Опис логіки обробників веб-з'єднання.
- Опис логіки клієнтської програми.
- Сервісна частина
Ініціалізація
Під час цього процесу ми знаходимо всі елементи на сторінці та визначаємо їх у коді як змінні. Потім підвішуємо на потрібні елементи обробники. Створюємо RTCPeerConnection об'єкти ps1 і ps2 у кожному з додатків і теж «чіпляємо» ними обробники. Однак, у нашому прикладі ми будемо використовувати по одному RTCPeerConnection об'єкту з кожної програми. Поки що ми хочемо передати лише один потік і в одному напрямку.
Крім ініціалізації, створюється ряд конфігураційних змінних та об'єкт веб-сокет з'єднання.
//// Ініціалізація /////
const startButton = $('#startButton');
const callButton = $('#callButton');
const hangupButton = $('#stopButton');
startButton.on('click', start);
callButton.on('click', call);
hangupButton.on('click', stop);
const localVideo = document.getElementById('localVideo');
const remoteVideo = document.getElementById('remoteVideo');
var localStream;
let pc1;
let pc2;
const offerOptions = {
Відповідьвідповідь:1,
ofToReceiveVideo: 1
};
// ws
var ws = new WebSocket("ws://localhost:8888/websocket");
const configuration = {};
// Створюємо об'єкти RTCPeerConnection з порожньою конфігурацією
console.log('RTCPeerConnection configuration:', configuration);
pc1 = новий RTCPeerConnection(configuration);
console.log('Created local peer connection object pc1');
pc2 = новий RTCPeerConnection(configuration);
console.log('Created remote peer connection object pc2');
// Додаємо обробники на подію додавання ICE кандидата
pc1.addEventListener('icecandidate', e => onIceCandidate(pc1, e, 'pc1'));
pc2.addEventListener('icecandidate', e => onIceCandidate(pc2, e, 'pc2'));
// Login
const loginButton = $('#loginButton');
const username = $('#username');
const socketId = $('#socketId');
const abonentId = $('#abonentId');
const clientList = $('#clientList');
loginButton.on('click', login);
Опис логіки обробників веб-сокет з'єднання
Наведений далі код має докладні коментарі і не потребує пояснення.
/// Обробник onmessage socket з'єднання ////
ws.onmessage = function (evt) {
let jdata = JSON.parse(evt.data);
// Установка поточного ідентифікатора сокет-з'єднання,
// згенерованого сервером Tornado
if( jdata['action'] == 'set_connection_id'){
console.log(`Set sign $`);
socketId.val(jdata['message']);
}
// оновлення списку користувачів онлайн
if( jdata['action'] == 'update_clients'){
console.log(`Updating clients $`);
clientList.empty();
let data = JSON.parse(jdata['message']);
data.forEach(function (item, index) {
console.log(item);
clientList.find(`#$`).on('click',call);
});
}
// Початок зоїнення з віддаленим хостом у відповідь на його запит
if( jdata['action'] == 'offer'){
console.log('Geting offer');
// Обробник додавання потоку на друге з'єднання
pc2.addEventListener('track', gotRemoteStream);
// встановлюємо initiator_id в input щоб знати
// його під час передачі ICE кандидата
abonentId.val(jdata['initiator_id']);
// встановлення RemoteDescription
pc2.setRemoteDescription(jdata['offer']).then(function(){
console.log('pc2.setRemoteDescription');
// генерація у відповідь SDP offer
pc2.createAnswer().then(function(answer){
// в onCreateAnswerSuccess ми надішлемо answer в сигнальний сервер
// та встановимо pc2.setLocalDescription
onCreateAnswerSuccess(answer,jdata['initiator_id']);
},onCreateSessionDescriptionError);
},onSetSessionDescriptionError);
}
// реакція на запит answer
if( jdata['action'] == 'answer'){
console.log('Geting answer');
pc1.setRemoteDescription(jdata['offer']).then(onSetRemoteSuccess,onSetSessionDescriptionError)
}
// Обмін ICE кандидатами
if( jdata['action'] == 'candidate'){
console.log('ICE candidate');
if (jdata['candidate'] != null){
let candidate = new RTCIceCandidate(jdata['candidate']);
// ми використовуємо параметр pc для того, щоб визначити для якого з'єднання
// Приходить кандидат призначається
if(jdata['pc'] == 'pc1'){
console.log(`Addinng ICE to pc1`);
pc1.addIceCandidate(candidate).then(onAddIceCandidateSuccess,onAddIceCandidateError);
} else {
console.log('Addinng ICE to pc2');
pc2.addIceCandidate(candidate).then(onAddIceCandidateSuccess,onAddIceCandidateError);
}
}
}
}
Опис логіки клієнтської програми
function login(){
console.log('Login button clecked');
let message = {'action': 'login', 'message': username.val()}
ws.send(JSON.stringify(message));
}
// Включення камери та отримання медіа-потоків.
async function start() {
console.log('Requesting local stream');
startButton.hide();
hangupButton.show();
try {
const stream = await navigator.mediaDevices.getUserMedia();
console.log('Received local stream');
localVideo.srcObject = stream;
localStream = stream;
callButton.show();
} catch (e) {
alert(`getUserMedia() error: $`);
}
};
// Точка входу в ініціювання з'єднання (клік на посилання користувача онлайн).
async function call(evt) {
// заберемо ідентифікатор абонента
let con_id = $(evt.target).attr('id');
// встановлюємо їх у input у тому щоб знати
// destination під час передачі ICE кандидата
abonentId.val(con_id);
// Встановимо стан кнопок
callButton.disabled = true;
hangupButton.disabled = false;
// Отримуємо та виводимо інформацію про медіа-потоки
const videoTracks = localStream.getVideoTracks();
const audioTracks = localStream.getAudioTracks();
if (videoTracks.length > 0) {
console.log(`Using video device: $`);
}
if (audioTracks.length > 0) {
console.log(`Using audio device: $`);
}
// Дістаємо медіа-доріжки з поточного stream об'єкта та передаємо їх в об'єкт RTCPeerConnection
localStream.getTracks().forEach(track => pc1.addTrack(track, localStream));
// Формуємо offer з PC1
try {
console.log('pc1 createOffer start');
const offer = await pc1.createOffer(offerOptions);
// в onCreateOfferSuccess ми відправимо offer сигнальному серверу
// та встановимо pc1.setLocalDescription
await onCreateOfferSuccess(offer,con_id);
} catch (e) {
console.log(`$`);
}
}
function stop() {
console.log('Stop')
// тут все обнулятимемо
console.log('Ending call');
localStream.getTracks().forEach(function(track) {
track.stop();
});
pc1.close();
pc2.close();
pc1 = null;
pc2 = null;
hangupButton.hide();
startButton.show();
};
// Функція формування offer
async function onCreateOfferSuccess(desc, conn_id) {
console.log('pc1 setLocalDescription start');
// надсилаємо запит offer на сервер
let message = {'action': 'offer', 'offer': desc, 'destination': conn_id}
ws.send(JSON.stringify(message));
try {
await pc1.setLocalDescription(desc);
onSetLocalSuccess(pc1);
} catch (e) {
console.log(`error setting description to pc1 $`);
}
}
// Додавання ICE кандидатів.
function onIceCandidate(pc, event, dest) {
// надсилаємо запит candidate на сервер
if(dest=='pc1') {
var message = {
'action': 'candidate',
'PC': 'PC2',
'candidate': event.candidate,
'destination': abonentId.val()
}
} else {
var message = {
'action': 'candidate',
'pc': 'pc1',
'candidate': event.candidate,
'destination': abonentId.val()
}
}
ws.send(JSON.stringify(message));
}
// Функція формування у відповідь SDP offer
async function onCreateAnswerSuccess(desc,initiator_id) {
console.log(`Answer from pc2`);
// посилаємо відповідь answer на сервер
let message = {'action': 'answer', 'offer': desc, 'destination': initiator_id}
ws.send(JSON.stringify(message));
try {
await pc2.setLocalDescription(desc);
onSetLocalSuccess(pc2);
} catch (e) {
onSetSessionDescriptionError(e);
}
}
// Функція додавання потоку до елемента remoteVideo
function gotRemoteStream(e) {
if (remoteVideo.srcObject !== e.streams[0]) {
remoteVideo.srcObject = e.streams[0];
console.log('pc2 received remote stream');
}
}
Сервісні функції для налагодження
function getName(pc) {
return (pc === pc1)? 'pc1' : 'pc2';
}
function getOtherPc(pc) {
return (pc === pc1)? pc2 : pc1;
}
function onSetLocalSuccess(pc) {
console.log(`$ setLocalDescription complete`);
}
function onSetRemoteSuccess(pc) {
console.log(`$ setRemoteDescription complete`);
}
function onCreateSessionDescriptionError(error) {
console.log(`Failed to create session description: $`);
}
function onAddIceCandidateSuccess(pc) {
console.log(`$ addIceCandidate success`);
}
function onAddIceCandidateError(error) {
console.log(` failed to add ICE Candidate: $`);
}
function onSetSessionDescriptionError(error) {
console.log(`Error setting SESSION description: $`);
}
localVideo.addEventListener('loadedmetadata', function() {
console.log(`Local video videoWidth: $px, videoHeight: $px`);
});
remoteVideo.addEventListener('loadedmetadata', function() {
console.log(`Remote video videoWidth: $px, videoHeight: $px`);
});
Код шаблону HTML
Результат роботи нашої програми зображений нижче.
На закінчення
Це була остання стаття із циклу, де ми розглянули основні стадії встановлення з'єднання та передачі інформації між його учасниками, а також алгоритми взаємодії між програмними компонентами програми. Підписуйтесь на наш блог, щоб отримати більше корисних матеріалів.
Якщо вас зацікавили можливості, які надає WebRTC-програма – залиште заявку на сайті. Фахівці Wezom створять справді ефективне рішення для вашого бізнесу.