Unverified Commit 59733fc7 authored by Dan Jones's avatar Dan Jones
Browse files

feat: in progress websocket client

parent 96722a8d
import axios from 'axios';
import WebSocket from 'faye-websocket';
/**
* Handle authentication and send/receive with the backbone
......@@ -72,6 +73,7 @@ class Adapter {
})
.then((response) => {
this.credentials = response.data;
if (this.getMode() === 'socket') this.openSocket();
return response;
})
.catch((error) => {
......@@ -96,14 +98,7 @@ class Adapter {
})
.then((response) => {
response.data.forEach((message) => {
const parsed = JSON.parse(message.message);
const validation = this.validate(parsed);
if (validation.valid) {
const type = this.protocol.getType(parsed);
this.protocol.decode(type, parsed);
} else {
this.protocol.receivedInvalid(parsed, validation);
}
this.receive(message);
});
return response;
})
......@@ -125,6 +120,49 @@ class Adapter {
});
}
receive(message) {
const parsed = JSON.parse(message.message);
const validation = this.validate(parsed);
if (validation.valid) {
const type = this.protocol.getType(parsed);
this.protocol.decode(type, parsed);
} else {
this.protocol.receivedInvalid(parsed, validation);
}
}
getMode() {
return ('mode' in this.config) ? this.config.mode : 'http';
}
openSocket() {
this.getAuthorizationHeader()
.then((headers) => {
const socket = new WebSocket.Client(
this.config.socket,
protocols,
options
);
socket.on('message', function(event) {
const message = event.data;
this.receive(message);
});
socket.on('close', function() {
this.closeSocket();
});
this.socket = socket;
return socket;
});
}
closeSocket() {
this.socket = null;
}
/**
* Publish a message to the backbone with the specified topic
*
......@@ -134,7 +172,17 @@ class Adapter {
* @param {boolean} is_retry
* @returns
*/
publish(topic, body, is_retry = false) {
publish(topic, body) {
let mode = this.getMode();
let response;
switch (mode) {
case 'http': response = this.http_publish(topic, body); break;
case 'socket': response = this.socket_publish(topic, body); break;
}
return response;
}
http_publish(topic, body, is_retry = false) {
let adapterConfig = this.config;
return this.getAuthorizationHeader()
.then((headers) => {
......@@ -165,11 +213,15 @@ class Adapter {
retry = true;
}
}
if (retry && !is_retry) return this.publish(topic, body, true);
if (retry && !is_retry) return this.http_publish(topic, body, true);
else return Promise.reject(error);
});
}
socket_publish(topic, body) {
this.socket.send(JSON.stringify({ topic, body }));
}
/**
* Broadcast the message on the backbone
*
......@@ -182,7 +234,17 @@ class Adapter {
* @param {boolean} is_retry
* @returns
*/
broadcast(body, is_retry = false) {
broadcast(body) {
let mode = this.getMode();
let response;
switch (mode) {
case 'http': response = this.http_broadcast(body); break;
case 'socket': response = this.socket_broadcast(body); break;
}
return response;
}
http_broadcast(body, is_retry = false) {
let adapterConfig = this.config;
return this.getAuthorizationHeader()
.then((headers) => {
......@@ -212,10 +274,14 @@ class Adapter {
retry = true;
}
}
if (retry && !is_retry) return this.broadcast(body, true);
if (retry && !is_retry) return this.http_broadcast(body, true);
else return Promise.reject(error);
});
}
socket_broadcast(body) {
this.socket.send(JSON.stringify({ body }));
}
}
export { Adapter };
'use strict';
var axios = require('axios');
var WebSocket = require('faye-websocket');
/**
* Handle authentication and send/receive with the backbone
......@@ -74,6 +75,7 @@ class Adapter {
})
.then((response) => {
this.credentials = response.data;
if (this.getMode() === 'socket') this.openSocket();
return response;
})
.catch((error) => {
......@@ -98,14 +100,7 @@ class Adapter {
})
.then((response) => {
response.data.forEach((message) => {
const parsed = JSON.parse(message.message);
const validation = this.validate(parsed);
if (validation.valid) {
const type = this.protocol.getType(parsed);
this.protocol.decode(type, parsed);
} else {
this.protocol.receivedInvalid(parsed, validation);
}
this.receive(message);
});
return response;
})
......@@ -127,6 +122,49 @@ class Adapter {
});
}
receive(message) {
const parsed = JSON.parse(message.message);
const validation = this.validate(parsed);
if (validation.valid) {
const type = this.protocol.getType(parsed);
this.protocol.decode(type, parsed);
} else {
this.protocol.receivedInvalid(parsed, validation);
}
}
getMode() {
return ('mode' in this.config) ? this.config.mode : 'http';
}
openSocket() {
this.getAuthorizationHeader()
.then((headers) => {
const socket = new WebSocket.Client(
this.config.socket,
protocols,
options
);
socket.on('message', function(event) {
const message = event.data;
this.receive(message);
});
socket.on('close', function() {
this.closeSocket();
});
this.socket = socket;
return socket;
});
}
closeSocket() {
this.socket = null;
}
/**
* Publish a message to the backbone with the specified topic
*
......@@ -136,7 +174,17 @@ class Adapter {
* @param {boolean} is_retry
* @returns
*/
publish(topic, body, is_retry = false) {
publish(topic, body) {
let mode = this.getMode();
let response;
switch (mode) {
case 'http': response = this.http_publish(topic, body); break;
case 'socket': response = this.socket_publish(topic, body); break;
}
return response;
}
http_publish(topic, body, body, is_retry = false) {
let adapterConfig = this.config;
return this.getAuthorizationHeader()
.then((headers) => {
......@@ -167,11 +215,15 @@ class Adapter {
retry = true;
}
}
if (retry && !is_retry) return this.publish(topic, body, true);
if (retry && !is_retry) return this.http_publish(topic, body, true);
else return Promise.reject(error);
});
}
socket_publish(topic, body) {
this.socket.send(JSON.stringify({ topic, body }));
}
/**
* Broadcast the message on the backbone
*
......@@ -184,7 +236,17 @@ class Adapter {
* @param {boolean} is_retry
* @returns
*/
broadcast(body, is_retry = false) {
broadcast(body) {
let mode = this.getMode();
let response;
switch (mode) {
case 'http': response = this.http_broadcast(body); break;
case 'socket': response = this.socket_broadcast(body); break;
}
return response;
}
http_broadcast(body, is_retry = false) {
let adapterConfig = this.config;
return this.getAuthorizationHeader()
.then((headers) => {
......@@ -214,10 +276,14 @@ class Adapter {
retry = true;
}
}
if (retry && !is_retry) return this.broadcast(body, true);
if (retry && !is_retry) return this.http_broadcast(body, true);
else return Promise.reject(error);
});
}
socket_broadcast(body) {
this.socket.send(JSON.stringify({ body }));
}
}
exports.Adapter = Adapter;
......@@ -37,7 +37,9 @@
"dependencies": {
"faye-websocket": "^0.11.4",
"json-schema-remote": "^1.6.2",
"swagger-model-validator": "^3.0.21"
"net": "^1.0.2",
"swagger-model-validator": "^3.0.21",
"tls": "^0.0.1"
},
"devDependencies": {
"@babel/core": "^7.0.0-0",
......@@ -46,7 +48,6 @@
"@commitlint/config-conventional": "^15.0.0",
"@cucumber/cucumber": "^8.10.0",
"@rollup/plugin-babel": "^6.0.3",
"axios": "^1.2.3",
"axios-mock-adapter": "^1.21.2",
"babel-jest": "^27.4.4",
"backbone-adapter-testsuite": "git+https://git.noc.ac.uk/communications-backbone-system/backbone-adapter-testsuite.git#v0.1.0",
......
exports.default = [
{
external: ['axios'],
input: 'src/adapter/index.js',
output: [
{
......@@ -13,6 +14,7 @@ exports.default = [
],
},
{
external: ['axios'],
input: 'src/protocol/index.js',
output: [
{
......
import axios from 'axios';
import WebSocket from 'faye-websocket';
/**
* Handle authentication and send/receive with the backbone
......@@ -72,6 +73,7 @@ export class Adapter {
})
.then((response) => {
this.credentials = response.data;
if (this.getMode() === 'socket') this.openSocket();
return response;
})
.catch((error) => {
......@@ -96,14 +98,7 @@ export class Adapter {
})
.then((response) => {
response.data.forEach((message) => {
const parsed = JSON.parse(message.message);
const validation = this.validate(parsed);
if (validation.valid) {
const type = this.protocol.getType(parsed);
this.protocol.decode(type, parsed);
} else {
this.protocol.receivedInvalid(parsed, validation);
}
this.receive(message);
});
return response;
})
......@@ -125,10 +120,49 @@ export class Adapter {
});
}
receive(message) {
const parsed = JSON.parse(message.message);
const validation = this.validate(parsed);
if (validation.valid) {
const type = this.protocol.getType(parsed);
this.protocol.decode(type, parsed);
} else {
this.protocol.receivedInvalid(parsed, validation);
}
}
getMode() {
return ('mode' in this.config) ? this.config.mode : 'http';
}
openSocket() {
this.getAuthorizationHeader()
.then((headers) => {
const socket = new WebSocket.Client(
this.config.socket,
protocols,
options
);
socket.on('message', function(event) {
const message = event.data;
this.receive(message);
});
socket.on('close', function() {
this.closeSocket();
});
this.socket = socket;
return socket;
});
}
closeSocket() {
this.socket = null;
}
/**
* Publish a message to the backbone with the specified topic
*
......@@ -139,10 +173,11 @@ export class Adapter {
* @returns
*/
publish(topic, body) {
let mode = self.getMode();
let mode = this.getMode();
let response;
switch (mode) {
case 'http': response = this.http_publish(topic, body); break;
case 'socket': response = this.socket_publish(topic, body); break;
}
return response;
}
......@@ -183,6 +218,10 @@ export class Adapter {
});
}
socket_publish(topic, body) {
this.socket.send(JSON.stringify({ topic, body }));
}
/**
* Broadcast the message on the backbone
*
......@@ -196,10 +235,11 @@ export class Adapter {
* @returns
*/
broadcast(body) {
let mode = self.getMode();
let mode = this.getMode();
let response;
switch (mode) {
case 'http': response = this.http_broadcast(body); break;
case 'socket': response = this.socket_broadcast(body); break;
}
return response;
}
......@@ -238,4 +278,8 @@ export class Adapter {
else return Promise.reject(error);
});
}
socket_broadcast(body) {
this.socket.send(JSON.stringify({ body }));
}
}
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment