#ifndef CROMP_SERVER_IMPLEMENTATION_FILE
#define CROMP_SERVER_IMPLEMENTATION_FILE

/*****************************************************************************\
*                                                                             *
*  Name   : cromp_server                                                      *
*  Author : Chris Koeritz                                                     *
*                                                                             *
*******************************************************************************
* Copyright (c) 2000-$now By Author.  This program is free software; you can  *
* redistribute it and/or modify it under the terms of the GNU General Public  *
* License as published by the Free Software Foundation; either version 2 of   *
* the License or (at your option) any later version.  This is online at:      *
*     http://www.fsf.org/copyleft/gpl.html                                    *
* Please send any updates to: fred@gruntose.com                               *
\*****************************************************************************/

#include "cromp_common.h"
#include "cromp_security.h"
#include "cromp_server.h"

#include <basis/function.h>
#include <basis/istring.h>
#include <basis/log_base.h>
#include <basis/portable.h>
#include <basis/mutex.h>
#include <data_struct/amorph.cpp>
#include <data_struct/unique_id.h>
#include <mechanisms/ithread.h>
#include <mechanisms/thread_cabinet.h>
#include <octopus/entity_data_bin.h>
#include <octopus/entity_defs.h>
#include <octopus/identity_infoton.h>
#include <octopus/infoton.h>
#include <octopus/tentacle.h>
#include <octopus/unhandled_request.h>
#include <sockets/address.h>
#include <sockets/tcpip_stack.h>
#include <sockets/spocket.h>
#include <tentacles/key_repository.h>
#include <tentacles/login_tentacle.h>

#ifndef OMIT_CRYPTO_SUPPORT
  #include <tentacles/encryption_tentacle.h>
  #include <tentacles/encryption_wrapper.h>
#endif

//#define DEBUG_CROMP_SERVER
  // uncomment for noisy version.

const int DEAD_CLIENT_CLEANING_INTERVAL = 1 * SECOND_ms;
  // we will drop any clients that have disconnected this long ago.

const int MAXIMUM_ACTIONS_PER_CLIENT = 4000;
  // this is the maximum number of things we'll do in one run for a
  // client, including both sends and receives.

const int SEND_TRIES_ALLOWED = 1;
  // the number of attempts we will make to get outgoing data to send.

const int SEND_THRESHOLD = 512 * KILOBYTE;
  // if we pile up some data to this point in our client gathering, we'll
  // go ahead and start pushing it to the client.

const int EXTREME_SEND_TRIES_ALLOWED = 28;
  // if we're clogged, we'll push this many times to get data out.

const int MAXIMUM_BYTES_PER_SEND = 2 * MEGABYTE;
  // the maximum size we want our buffer to grow.

const int MAXIMUM_SIZE_BATCH = 384 * KILOBYTE;
  // the largest chunk of updates we'll try to grab at one time.

const int DROPPING_INTERVAL = 500;
  // the rate at which we'll check for dead clients and clean up.

const int DATA_AWAIT_TIMEOUT = 14;
  // how long the server zones out waiting for data.

const int ACCEPTANCE_SNOOZE = 60;
  // if the server sees no clients, it will take a little nap.

#undef LOG
#define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger(), to_print)

////////////////////////////////////////////////////////////////////////////

// forward.
class cromp_client_record;

class cromp_data_grabber : public ithread
{
public:
  cromp_data_grabber(cromp_client_record &parent, octopus *octo)
      : ithread(), _parent(parent), _octo(octo) {}

  IMPLEMENT_CLASS_NAME("cromp_data_grabber");

  virtual void perform_activity(void *);

private:
  cromp_client_record &_parent;
  octopus *_octo;
};
 
////////////////////////////////////////////////////////////////////////////

class cromp_client_record : public cromp_common
{
public:
  cromp_client_record(cromp_server &parent, spocket *client, octopus *octo)
  : cromp_common(client, octo),
    _parent(parent),
    _octo(octo),
    _ent(),
    _healthy(true),
    _fixated(false),
    _grabber(*this, octo),
    _waiting(),
    _still_connected(true)
  {
    internet_address local_addr = internet_address
        (internet_address::localhost(), client->stack().hostname(), 0);
    open_common(local_addr);  // open the common support for biz.
    _grabber.start(NIL);  // crank up our background data pump on the socket.
  }

  ~cromp_client_record() {
    croak();
  }

  IMPLEMENT_CLASS_NAME("cromp_client_record");

  bool handle_client_needs(ithread &prompter) {
    FUNCDEF("handle_client_needs");
#ifdef DEBUG_CROMP_SERVER
    time_stamp start;
#endif
    if (!_healthy) return false;  // done.
    if (!spock()->connected()) {
      _still_connected = false;
      return false;  // need to stop now.
    }
    bool keep_going = true;
    int actions = 0;
    while (keep_going && (actions < MAXIMUM_ACTIONS_PER_CLIENT) ) {
      // make sure we don't overstay our welcome when the thread's supposed
      // to quit.
      if (prompter.should_stop()) return false;
      keep_going = false;  // only continue if there's a reason.
      bool ret = get_incoming_data(actions);  // look for requests.
      if (ret) keep_going = true;
      ret = push_client_replies(actions);  // send replies back to the client.
      if (ret) keep_going = true;
    }


#ifdef DEBUG_CROMP_SERVER
    if (actions > 10) {
      LOG(isprintf("actions=%d", actions));
      LOG(isprintf("%d pending send bytes, %d bytes accumulated, bin has "
          "%d items.", pending_sends(), accumulated_bytes(),
          octo()->responses().items_held()));

    int duration = int(time_stamp().value() - start.value());
    if (duration > 200) {
      LOG(isprintf("duration=%d ms.", duration));
    }
#endif

    return true;
  }

  const octopus_entity &ent() const { return _ent; }

  // stops the background activity of this object and drops the connection
  // to the client.
  void croak() {
    FUNCDEF("croak");
    _grabber.stop();
    int actions = 0;
    while (get_incoming_data(actions)) {
      // keep receiving whatever's there already.  we are trying to drain
      // the socket before destroying it.
    }
    _healthy = false;
    close_common();
  }

  bool healthy() const { return _healthy; }
    // this is true unless the object has been told to shut down.

  bool still_connected() const { return _still_connected; }
    // this is true unless the client side dropped the connection.

  cromp_server &parent() const { return _parent; }

  bool push_client_replies(int &actions) {
    FUNCDEF("push_client_replies");
    if (!healthy()) return false;
    if (ent().blank()) {
      // not pushing replies if we haven't even gotten a command yet.
#ifdef DEBUG_CROMP_SERVER
      LOG("not pushing replies for blank.");
#endif
      return false;
    }

    if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
LOG("buffer clog being cleared now.");
      // the buffers are pretty full; we'll try later.
      push_outgoing(EXTREME_SEND_TRIES_ALLOWED);
      // if we're still clogged, then leave.
      if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
LOG("could not completely clear buffer clog.");
        return true;
      }
LOG("cleared out buffer clog.");
    }

    int any_left = true;
    while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
      // make sure we're not wasting our time.
      if (!_octo->responses().items_held()) {
        any_left = false;
        break;
      }
      // make sure we don't ignore receptions.
      grab_anything(false);
      // try to grab a result for this entity.
      int num_located = _octo->responses().acquire_for_entity(ent(),
          _waiting, MAXIMUM_SIZE_BATCH);
      if (!num_located) {
        any_left = false;
        break;
      }

#ifndef OMIT_CRYPTO_SUPPORT
      // if we're encrypting, we need to wrap these as well.
      if (_parent.encrypting()) {
        for (int i = 0; i < _waiting.elements(); i++) {
          infoton *curr = _waiting[i]->_data;
          infoton *processed = _parent.wrap_infoton(curr,
              _waiting[i]->_id._entity);
          if (processed) _waiting[i]->_data = processed;  // replace infoton.
        }
      }
#endif

      outcome ret = pack_and_ship(_waiting, 0);
        // no attempt to send yet; we're just stuffing the buffer.
      if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
//hmmm: what about keeping transmission as held in list; retry later on it?

//#ifdef DEBUG_CROMP_SERVER
        LOG(istring("failed to send package back to client: ")
            + cromp_common::outcome_name(ret));
//#endif
        any_left = false;
        break;
      }

      if (pending_sends() > SEND_THRESHOLD) {
#ifdef DEBUG_CROMP_SERVER
        LOG(istring("over sending threshold on ") + _ent.text_form());
#endif
        push_outgoing(SEND_TRIES_ALLOWED);
      }

    }
    // now that we've got a pile possibly, we'll try to send them out.
    push_outgoing(SEND_TRIES_ALLOWED);
    if (!spock()->connected()) {
#ifdef DEBUG_CROMP_SERVER
      LOG("noticed disconnection of client.");
#endif
      _still_connected = false;
    }
    return any_left;
  }

  bool get_incoming_data(int &actions) {
    FUNCDEF("get_incoming_data");
    if (!healthy()) return false;
    int first_one = true;
    bool saw_something = false;  // true if we got a packet.
    while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
      // pull in anything waiting.
      infoton *item = NIL;
      octopus_request_id req_id;
      outcome ret = retrieve_and_restore_any(item, req_id,
          first_one? DATA_AWAIT_TIMEOUT : 0);
      first_one = false;
      if (ret == cromp_common::TIMED_OUT) {
        actions--;  // didn't actually eat one.
        return false;
      } else if (ret != cromp_common::OKAY) {
#ifdef DEBUG_CROMP_SERVER
        LOG(istring("got error ") + cromp_common::outcome_name(ret));
#endif
        if (ret == cromp_common::NO_CONNECTION) {
#ifdef DEBUG_CROMP_SERVER
          LOG("noticed disconnection of client.");
#endif
          _still_connected = false;
        }
        actions--;  // didn't actually eat one.
        return false;  // get outa here.
      }
      // got a packet.
      saw_something = true;
      if (!_fixated) {
        if (req_id._entity.blank()) {
          LOG(istring("would have assigned ours to blank id! ")
              + req_id._entity.mangled_form());
          WHACK(item);
          continue;
        }
#ifdef DEBUG_CROMP_SERVER
        LOG(istring("cmd with entity ") + req_id._entity.mangled_form());
#endif
        if (_ent.blank()) {
          // assign the entity id now that we know it.
          _ent = req_id._entity;
#ifdef DEBUG_CROMP_SERVER
          LOG(istring("assigned own entity to ") + _ent.mangled_form());
#endif
        } else if (!_fixated && (_ent != req_id._entity) ) {
#ifdef DEBUG_CROMP_SERVER
          LOG(istring("fixated on entity of ") + req_id._entity.mangled_form()
              + " where we used to have " + _ent.mangled_form());
#endif
          _ent = req_id._entity;
          _fixated = true;
        }
      }  // connects to line after debug just below.
#ifdef DEBUG_CROMP_SERVER
        else if (_ent != req_id._entity) {
        // this checks the validity of the entity.
#ifdef DEBUG_CROMP_SERVER
        LOG(istring("seeing wrong entity of ") + req_id._entity.mangled_form()
            + " when we fixated on " + _ent.mangled_form());
#endif
        WHACK(item);
        continue;
      }
#endif
      // check again so we make sure we're still healthy; could have changed
      // state while getting a command.
      if (!healthy()) {
        WHACK(item);
        continue;
      }
      string_array classif = item->classifier();
        // hang onto the classifier since the next time we get a chance, the
        // object might be destroyed.

      // we pass responsibility for this item over to the octopus.  that's why
      // we're not deleting it once evaluate gets the item.
      ret = _octo->evaluate(item, req_id, _parent.instantaneous());
      if (ret != tentacle::OKAY) {
#ifdef DEBUG_CROMP_SERVER
        LOG(istring("failed to evaluate the infoton we got: ")
            + classif.text_form());
#endif
//hmmm: we have upgraded this response to be for all errors, since otherwise
//      clients will just time out waiting for something that's never coming.

        // we do a special type of handling when the tentacle is missing.  this
        // is almost always because the wrong type of request is being sent to
        // a server, or the server didn't register for all the objects it is
        // supposed to handle.
/////        if (ret == tentacle::NOT_FOUND) {
//#ifdef DEBUG_CROMP_SERVER
          LOG(istring("injecting unhandled note into response stream for ")
              + req_id.text_form() + ", got outcome " + outcome_name(ret));
//#endif
          _parent.send_to_client(req_id,
              new unhandled_request(req_id, classif, ret));
            // this will always work, although it's not a surety that the
            // client actually still exists.  probably though, since we're
            // just now handling this request.
/////        }
      }
    }
    return saw_something;  // keep going if we actually did anything good.
  }

private:
  cromp_server &_parent;  // the object that owns this client.
  octopus *_octo;
  octopus_entity _ent;  // the entity by which we know this client.
  bool _healthy;  // reports our current state of happiness.
  bool _fixated;  // true if the entity id has become firm.
  cromp_data_grabber _grabber;  // the data grabbing thread.
  infoton_list _waiting;
    // used by the push_client_replies() method; allocated once to avoid churn.
  bool _still_connected;
    // set to true up until we notice that the client disconnected.
};

////////////////////////////////////////////////////////////////////////////

void cromp_data_grabber::perform_activity(void *)
{
  FUNCDEF("perform_activity");
  while (!should_stop()) {
//    time_stamp started;
    bool ret = _parent.handle_client_needs(*this);
//    int duration = int(time_stamp().value() - started.value());
    if (!ret) {
      // they said to stop.
#ifdef DEBUG_CROMP_SERVER
      LOG("done handling client needs.");
#endif
      _octo->expunge(_parent.ent());
      break;
    }
  }
}

////////////////////////////////////////////////////////////////////////////

class cromp_client_list : public amorph<cromp_client_record>
{
public:
  int find(const octopus_entity &to_find) const {
    for (int i = 0; i < elements(); i++)
      if (to_find == get(i)->ent()) return i;
    return common::NOT_FOUND;
  }
};

////////////////////////////////////////////////////////////////////////////

class client_dropping_thread : public ithread
{
public:
  client_dropping_thread (cromp_server &parent)
  : ithread(DROPPING_INTERVAL),
    _parent(parent) {}

  void perform_activity(void *formal(ptr)) {
    FUNCDEF("perform_activity");
    _parent.drop_dead_clients(); 
  }

private:
  cromp_server &_parent;  // we perform tricks for this object.
};

////////////////////////////////////////////////////////////////////////////

class connection_management_thread : public ithread
{
public:
  connection_management_thread(cromp_server &parent)
  : ithread(),
    _parent(parent) {}

  void perform_activity(void *formal(ptr)) {
    FUNCDEF("perform_activity");
    _parent.look_for_clients(*this); 
  }

private:
  cromp_server &_parent;  // we perform tricks for this object.
};

////////////////////////////////////////////////////////////////////////////

#undef LOCK_LISTS
#define LOCK_LISTS auto_synchronizer l(*_list_lock)
  // takes over access to the client list and root socket.

cromp_server::cromp_server(const internet_address &where,
    int accepting_threads, bool instantaneous, int max_per_ent)
: cromp_common(cromp_common::chew_hostname(where), max_per_ent),
  _clients(new cromp_client_list),
  _accepters(new thread_cabinet),
  _list_lock(new mutex),
  _next_droppage(new time_stamp(DEAD_CLIENT_CLEANING_INTERVAL)),
  _instantaneous(instantaneous),
  _where(new internet_address(where)),
  _accepting_threads(accepting_threads),
  _dropper(new client_dropping_thread(*this)),
  _enabled(false),
  _encrypt_arm(NIL),
  _default_security(new cromp_security)
{
  FUNCDEF("constructor");
}
 
cromp_server::~cromp_server()
{
  disable_servers();
  _encrypt_arm = NIL;
  WHACK(_accepters);
  WHACK(_dropper);
  WHACK(_clients);
  WHACK(_next_droppage);
  WHACK(_where);
  WHACK(_default_security);
  WHACK(_list_lock);
}

internet_address cromp_server::location() const { return *_where; }

bool cromp_server::get_sizes(const octopus_entity &id, int &items, int &bytes)
{ return octo()->responses().get_sizes(id, items, bytes); }

internet_address cromp_server::any_address(int port)
{
  const byte any_list[] = { 0, 0, 0, 0 };
  return internet_address(byte_array(4, any_list), "", port);
}

istring cromp_server::responses_text_form() const
{ return octo()->responses().text_form(); }

int cromp_server::DEFAULT_ACCEPTERS() {
  // default number of listening threads; this is the maximum number of mostly
  // simultaneous connections that the server can pick up at a time.
#ifdef EMBEDDED_BUILD
  return 1;  // calm number of threads for embedded systems.
#else
  return 7;  // others are not generally so limited on resources.
#endif
}

#ifndef OMIT_CRYPTO_SUPPORT
infoton *cromp_server::wrap_infoton(infoton * &request,
    const octopus_entity &ent)
{
  FUNCDEF("wrap_infoton");
  // identity is not wrapped with encryption; we need to establish and identity
  // to talk on a distinct channel with the server.  even if that identity were
  // compromised, the interloper should still not be able to listen in on the
  // establishment of an encryption channel.  also, the encryption startup
  // itself is not encrypted and we don't want to re-encrypt the wrapper.
  if (dynamic_cast<identity_infoton *>(request)
      || dynamic_cast<encryption_infoton *>(request)
      || dynamic_cast<encryption_wrapper *>(request)) return NIL;

#ifdef DEBUG_CROMP_SERVER
  LOG(istring("encrypting ") + request->text_form());
#endif

  octenc_key_record *key = _encrypt_arm->keys().lock(ent);
    // lock here is released a bit down below.
  if (!key) {
    LOG(istring("failed to locate key for entity ") + ent.text_form());
    return NIL;
  }
  byte_array packed_request;
  infoton::fast_pack(packed_request, *request);
  WHACK(request);
  encryption_wrapper *to_return = new encryption_wrapper;
  key->_key.encrypt(packed_request, to_return->_wrapped);
  _encrypt_arm->keys().unlock(key);
  return to_return;
}
#endif

void cromp_server::enable_servers(bool encrypt, cromp_security *security)
{
  FUNCDEF("enable_servers");
  if (encrypt) {
#ifndef OMIT_CRYPTO_SUPPORT
    // add the tentacles needed for encryption.
#ifdef DEBUG_CROMP_SERVER
    LOG(istring("enabling encryption for ") + class_name()
        + " on " + _where->text_form());
#endif
    _encrypt_arm = new encryption_tentacle;
    add_tentacle(_encrypt_arm, true);
    add_tentacle(new unwrapping_tentacle, false);
#endif
  }
  if (security) {
    add_tentacle(new login_tentacle(*security), true);
  } else {
    add_tentacle(new login_tentacle(*_default_security), true);
  }
  open_common(*_where);  // open the common ground.

  accept_one_client(false);  // try first accept, no waiting.

#ifdef DEBUG_CROMP_SERVER
  LOG(isprintf("adding %d accepting threads.", _accepting_threads));
#endif
  for (int i = 0; i < _accepting_threads; i++) {
    // crank in a new thread and tell it yes on starting it.
    _accepters->add_thread(new connection_management_thread(*this), true, NIL);
  }

  _dropper->start(NIL);
  _enabled = true;
}

void cromp_server::disable_servers()
{
  FUNCDEF("disable_servers");
  _dropper->stop();  // signal the thread to leave when it can.
  _accepters->stop_all();  // signal the accepting threads to exit.
  if (_clients) {
    LOCK_LISTS;
      // make sure no one rearranges or uses the client list while we're
      // working on it.
    for (int i = 0; i < _clients->elements(); i++) {
      // stop the client's activities before the big shutdown.
      cromp_client_record *cli = (*_clients)[i];
      if (cli) cli->croak();
    }
  }

  close_common();  // zap the socket so that our blocked waiters get woken up.

  // now finalize the shutdown.  we don't grab the lock because we don't want
  // a deadlock, but we also shouldn't need to grab the lock.  by here, we have
  // cancelled all threads, no new clients should be able to be added, and the
  // destruction of this list will ensure that each client's thread really is
  // stopped.
  WHACK(_clients);

  _enabled = false;  // record our defunctivity.
}

int cromp_server::clients() const
{
  LOCK_LISTS;
  return _clients? _clients->elements() : 0;
}

bool cromp_server::find_entity(const octopus_entity &id,
    internet_address &found)
{
  FUNCDEF("find_entity");
  found = internet_address();
  LOCK_LISTS;
  int indy = _clients->find(id);
  if (negative(indy)) return false;  // didn't find it.
  cromp_client_record *cli = (*_clients)[indy];
    // pull out the address from the record at that index.
  found = cli->spock()->remote();
  return true;
}

outcome cromp_server::accept_one_client(bool wait)
{
  FUNCDEF("accept_one_client");
  spocket *accepted = NIL;
//printf((timestamp(true, true) + "into accept\n").s());
  outcome ret = spock()->accept(accepted, wait);
//printf((timestamp(true, true) + "out of accept\n").s());
    // accept and wait for it to finish.
  if ( (ret == spocket::OKAY) && accepted) {
    // we got a new client to talk to.
    cromp_client_record *adding = new cromp_client_record(*this, accepted,
        octo());
#ifdef DEBUG_CROMP_SERVER
    LOG(isprintf("found a new client on sock %d.", accepted->OS_socket()));
#endif
    LOCK_LISTS;  // short term lock.
    _clients->append(adding);
    return OKAY;
  } else {
    if (ret == spocket::NO_CONNECTION)
      return NOT_FOUND;  // normal occurrence.
#ifdef DEBUG_CROMP_SERVER
    LOG(istring("error accepting client: ") + spocket::outcome_name(ret));
#endif
    return DISALLOWED;
  }
}

void cromp_server::look_for_clients(ithread &requestor)
{
  FUNCDEF("look_for_clients");
  // see if any clients have been accepted.
  while (!requestor.should_stop()) {
    outcome ret = accept_one_client(false);
    if ( (ret != OKAY) && (ret != NOT_FOUND) ) {
      // we got an error condition besides our normal set.
//#ifdef DEBUG_CROMP_SERVER
      LOG(istring("got real error on socket; leaving for good.")
          + spocket::outcome_name(ret));
//#endif
      break;
    }
    // if we weren't told we got a client, then we'll sleep.  if we did get
    // a client, we'll try again right away.
    if (ret != OKAY)
      portable::sleep_ms(ACCEPTANCE_SNOOZE);
  }
}

outcome cromp_server::send_to_client(const octopus_request_id &id,
    infoton *data)
{
  FUNCDEF("send_to_client");
  if (!octo()->responses().add_item(data, id)) {
#ifdef DEBUG_CROMP_SERVER
    LOG("failed to store result for client--no space left currently.");
#endif
    return TOO_FULL;
  }
  return OKAY;
}

outcome cromp_server::get_from_client(const octopus_request_id &id,
    infoton * &data, int timeout)
{
  FUNCDEF("get_from_client");
//hmmm: this implementation locks the lists; can't we get the client to do
//      most of the work for this?
  LOCK_LISTS;
  int indy = _clients->find(id._entity);
  if (negative(indy)) return NOT_FOUND;  // didn't find it.
  cromp_client_record *cli = (*_clients)[indy];
  return cli->retrieve_and_restore(data, id, timeout);
}

void cromp_server::drop_dead_clients()
{
  FUNCDEF("drop_dead_clients");
  // clean out any dead clients.

  {
    LOCK_LISTS;
    if (time_stamp() < *_next_droppage) return;  // not time yet.
  }

  LOCK_LISTS;  // keep locked from now on.
  for (int i = 0; i < _clients->elements(); i++) {
    cromp_client_record *cli = (*_clients)[i];
    if (!cli) {
#ifdef DEBUG_CROMP_SERVER
      LOG(istring("error in list structure."));
#endif
      _clients->zap(i, i);
      i--;   // skip back before deleted guy.
      continue;
    }
    if (!cli->still_connected() || !cli->healthy()) {
#ifdef DEBUG_CROMP_SERVER
      LOG(istring("dropping disconnected client ") + cli->ent().mangled_form());
#endif
      cli->croak();  // stop it from operating.

//hmmm: check if it has data waiting and complain about it perhaps.
      _clients->zap(i, i);
      i--;   // skip back before deleted guy.
      continue;
    }
  }

  _next_droppage->reset(DEAD_CLIENT_CLEANING_INTERVAL);
}


#endif //CROMP_SERVER_IMPLEMENTATION_FILE

