You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

182 lines
6.1 KiB

'use strict'
const EventEmitter = require('events').EventEmitter;
const util = require('./util');
const initSocket = require('./init_socket');
const connectSocket = require('./connect_socket');
class Client {
constructor(port, host, protocol = 'tcp', net, tls) {
this.net = net;
this.tls = tls;
this.id = 0;
this.port = port;
this.host = host;
this.callback_message_queue = {};
this.subscribe = new EventEmitter();
this.conn = initSocket(this, protocol);
this.mp = new util.MessageParser((body, n) => {
this.onMessage(body, n);
});
this.status = 0
}
async connect() {
if (this.status) {
return Promise.resolve({error: false, data: ""});
}
const connectionResponse = await connectSocket(this.conn, this.port, this.host);
this.status = connectionResponse.error === true ? 0 : 1;
return Promise.resolve(connectionResponse);
}
close() {
if (!this.status) {
return;
}
this.conn.end();
//this.conn.destroy();
this.status = 0;
}
request(method, params) {
if (!this.status) {
return Promise.reject(new Error('Connection to server lost, please retry'));
}
return new Promise((resolve, reject) => {
const id = ++this.id;
const content = util.makeRequest(method, params, id);
this.callback_message_queue[id] = util.createPromiseResult(resolve, reject);
this.conn.write(content + '\n');
});
}
requestObjectBatch(method, params, secondParam) {
return new Promise((resolve, reject) => {
let arguments_far_calls = {};
let contents = [];
const { key, data } = params;
if (Array.isArray(data)) {
for (let item of data) {
const id = ++this.id;
let param = "";
if (key in item) param = item[key];
if (secondParam !== undefined) {
contents.push(util.makeRequest(method, [param, secondParam], id));
} else {
contents.push(util.makeRequest(method, [param], id));
}
arguments_far_calls[id] = { param, data: item };
}
} else {
for (let item of Object.keys(data)) {
const id = ++this.id;
let param = "";
if (key in data[item]) param = data[item][key];
if (secondParam !== undefined) {
contents.push(util.makeRequest(method, [param, secondParam], id));
} else {
contents.push(util.makeRequest(method, [param], id));
}
arguments_far_calls[id] = { param, data: data[item] };
}
}
const content = '[' + contents.join(',') + ']';
this.callback_message_queue[this.id] = util.createPromiseResultBatch(resolve, reject, arguments_far_calls);
// callback will exist only for max id
this.conn.write(content + '\n');
});
}
requestBatch(method, params, secondParam) {
if (!this.status) {
return Promise.reject(new Error('Connection to server lost, please retry'));
}
if (typeof params === "object" && "key" in params && "data" in params) {
return this.requestObjectBatch(method, params, secondParam)
}
return new Promise((resolve, reject) => {
let arguments_far_calls = {};
let contents = [];
for (let param of params) {
const id = ++this.id;
let data = {};
if (secondParam !== undefined) {
contents.push(util.makeRequest(method, [param, secondParam], id));
} else {
contents.push(util.makeRequest(method, [param], id));
}
arguments_far_calls[id] = { param, data };
}
const content = '[' + contents.join(',') + ']';
this.callback_message_queue[this.id] = util.createPromiseResultBatch(resolve, reject, arguments_far_calls);
// callback will exist only for max id
this.conn.write(content + '\n');
});
}
response(msg) {
let callback;
if (!msg.id && msg[0] && msg[0].id) {
// this is a response from batch request
for (let m of msg) {
if (m.id && this.callback_message_queue[m.id]) {
callback = this.callback_message_queue[m.id];
delete this.callback_message_queue[m.id];
}
}
} else {
callback = this.callback_message_queue[msg.id];
}
if (callback) {
delete this.callback_message_queue[msg.id];
if (msg.error) {
callback(msg.error);
} else {
callback(null, msg.result || msg);
}
} else {
console.log("Can't get callback"); // can't get callback
}
}
onMessage(body, n) {
try {
const msg = JSON.parse(body);
if (msg instanceof Array) {
this.response(msg);
} else {
if (msg.id !== void 0) {
this.response(msg);
} else {
this.subscribe.emit(msg.method, msg.params);
}
}
} catch (error) {
this.conn.end();
this.conn.destroy();
this.onClose(error);
}
}
onConnect(){
}
onClose(){
this.status = 0;
Object.keys(this.callback_message_queue).forEach((key) => {
this.callback_message_queue[key](new Error('close connect'))
delete this.callback_message_queue[key]
})
}
onRecv(chunk){
this.mp.run(chunk)
}
onError(e){
console.log('OnError:' + e);
}
}
module.exports = Client;