'use strict';

const assert = require('assert');
const util = require('util');
const Socket = require('net').Socket;
const JSStream = process.binding('js_stream').JSStream;
const Buffer = require('buffer').Buffer;
const uv = process.binding('uv');
const debug = util.debuglog('stream_wrap');

function StreamWrap(stream) {
  const handle = new JSStream();

  this.stream = stream;

  this._list = null;

  const self = this;
  handle.close = function(cb) {
    debug('close');
    self.doClose(cb);
  };
  handle.isAlive = function() {
    return self.isAlive();
  };
  handle.isClosing = function() {
    return self.isClosing();
  };
  handle.onreadstart = function() {
    return self.readStart();
  };
  handle.onreadstop = function() {
    return self.readStop();
  };
  handle.onshutdown = function(req) {
    return self.doShutdown(req);
  };
  handle.onwrite = function(req, bufs) {
    return self.doWrite(req, bufs);
  };

  this.stream.pause();
  this.stream.on('error', function onerror(err) {
    self.emit('error', err);
  });
  this.stream.on('data', function ondata(chunk) {
    if (!(chunk instanceof Buffer)) {
      // Make sure that no further `data` events will happen
      this.pause();
      this.removeListener('data', ondata);

      self.emit('error', new Error('Stream has StringDecoder'));
      return;
    }

    debug('data', chunk.length);
    if (self._handle)
      self._handle.readBuffer(chunk);
  });
  this.stream.once('end', function onend() {
    debug('end');
    if (self._handle)
      self._handle.emitEOF();
  });

  Socket.call(this, {
    handle: handle
  });
}
util.inherits(StreamWrap, Socket);
module.exports = StreamWrap;

// require('_stream_wrap').StreamWrap
StreamWrap.StreamWrap = StreamWrap;

StreamWrap.prototype.isAlive = function isAlive() {
  return true;
};

StreamWrap.prototype.isClosing = function isClosing() {
  return !this.readable || !this.writable;
};

StreamWrap.prototype.readStart = function readStart() {
  this.stream.resume();
  return 0;
};

StreamWrap.prototype.readStop = function readStop() {
  this.stream.pause();
  return 0;
};

StreamWrap.prototype.doShutdown = function doShutdown(req) {
  const self = this;
  const handle = this._handle;
  const item = this._enqueue('shutdown', req);

  this.stream.end(function() {
    // Ensure that write was dispatched
    setImmediate(function() {
      if (!self._dequeue(item))
        return;

      handle.finishShutdown(req, 0);
    });
  });
  return 0;
};

StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
  const self = this;
  const handle = self._handle;

  var pending = bufs.length;

  // Queue the request to be able to cancel it
  const item = self._enqueue('write', req);

  self.stream.cork();
  bufs.forEach(function(buf) {
    self.stream.write(buf, done);
  });
  self.stream.uncork();

  function done(err) {
    if (!err && --pending !== 0)
      return;

    // Ensure that this is called once in case of error
    pending = 0;

    // Ensure that write was dispatched
    setImmediate(function() {
      // Do not invoke callback twice
      if (!self._dequeue(item))
        return;

      var errCode = 0;
      if (err) {
        if (err.code && uv['UV_' + err.code])
          errCode = uv['UV_' + err.code];
        else
          errCode = uv.UV_EPIPE;
      }

      handle.doAfterWrite(req);
      handle.finishWrite(req, errCode);
    });
  }

  return 0;
};

function QueueItem(type, req) {
  this.type = type;
  this.req = req;
  this.prev = this;
  this.next = this;
}

StreamWrap.prototype._enqueue = function enqueue(type, req) {
  const item = new QueueItem(type, req);
  if (this._list === null) {
    this._list = item;
    return item;
  }

  item.next = this._list.next;
  item.prev = this._list;
  item.next.prev = item;
  item.prev.next = item;

  return item;
};

StreamWrap.prototype._dequeue = function dequeue(item) {
  assert(item instanceof QueueItem);

  var next = item.next;
  var prev = item.prev;

  if (next === null && prev === null)
    return false;

  item.next = null;
  item.prev = null;

  if (next === item) {
    prev = null;
    next = null;
  } else {
    prev.next = next;
    next.prev = prev;
  }

  if (this._list === item)
    this._list = next;

  return true;
};

StreamWrap.prototype.doClose = function doClose(cb) {
  const self = this;
  const handle = self._handle;

  setImmediate(function() {
    while (self._list !== null) {
      const item = self._list;
      const req = item.req;
      self._dequeue(item);

      const errCode = uv.UV_ECANCELED;
      if (item.type === 'write') {
        handle.doAfterWrite(req);
        handle.finishWrite(req, errCode);
      } else if (item.type === 'shutdown') {
        handle.finishShutdown(req, errCode);
      }
    }

    // Should be already set by net.js
    assert(self._handle === null);
    cb();
  });
};