//
// basic_socket_streambuf.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2012 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_ASIO_BASIC_SOCKET_STREAMBUF_HPP
#define BOOST_ASIO_BASIC_SOCKET_STREAMBUF_HPP

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)

#include <boost/asio/detail/config.hpp>

#if !defined(BOOST_NO_IOSTREAM)

#include <streambuf>
#include <boost/utility/base_from_member.hpp>
#include <boost/asio/basic_socket.hpp>
#include <boost/asio/deadline_timer_service.hpp>
#include <boost/asio/detail/array.hpp>
#include <boost/asio/detail/throw_error.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/stream_socket_service.hpp>
#include <boost/asio/time_traits.hpp>

#include <boost/asio/detail/push_options.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/asio/detail/pop_options.hpp>

#if !defined(BOOST_ASIO_HAS_VARIADIC_TEMPLATES)

# include <boost/preprocessor/arithmetic/inc.hpp>
# include <boost/preprocessor/repetition/enum_binary_params.hpp>
# include <boost/preprocessor/repetition/enum_params.hpp>
# include <boost/preprocessor/repetition/repeat_from_to.hpp>

# if !defined(BOOST_ASIO_SOCKET_STREAMBUF_MAX_ARITY)
#  define BOOST_ASIO_SOCKET_STREAMBUF_MAX_ARITY 5
# endif // !defined(BOOST_ASIO_SOCKET_STREAMBUF_MAX_ARITY)

// A macro that should expand to:
//   template <typename T1, ..., typename Tn>
//   basic_socket_streambuf<Protocol, StreamSocketService,
//     Time, TimeTraits, TimerService>* connect(
//       T1 x1, ..., Tn xn)
//   {
//     init_buffers();
//     this->basic_socket<Protocol, StreamSocketService>::close(ec_);
//     typedef typename Protocol::resolver resolver_type;
//     typedef typename resolver_type::query resolver_query;
//     resolver_query query(x1, ..., xn);
//     resolve_and_connect(query);
//     return !ec_ ? this : 0;
//   }
// This macro should only persist within this file.

# define BOOST_ASIO_PRIVATE_CONNECT_DEF( z, n, data ) \
  template <BOOST_PP_ENUM_PARAMS(n, typename T)> \
  basic_socket_streambuf<Protocol, StreamSocketService, \
    Time, TimeTraits, TimerService>* connect( \
      BOOST_PP_ENUM_BINARY_PARAMS(n, T, x)) \
  { \
    init_buffers(); \
    this->basic_socket<Protocol, StreamSocketService>::close(ec_); \
    typedef typename Protocol::resolver resolver_type; \
    typedef typename resolver_type::query resolver_query; \
    resolver_query query(BOOST_PP_ENUM_PARAMS(n, x)); \
    resolve_and_connect(query); \
    return !ec_ ? this : 0; \
  } \
  /**/

#endif // !defined(BOOST_ASIO_HAS_VARIADIC_TEMPLATES)

#include <boost/asio/detail/push_options.hpp>

namespace boost {
namespace asio {

/// Iostream streambuf for a socket.
template <typename Protocol,
    typename StreamSocketService = stream_socket_service<Protocol>,
    typename Time = boost::posix_time::ptime,
    typename TimeTraits = boost::asio::time_traits<Time>,
    typename TimerService = deadline_timer_service<Time, TimeTraits> >
class basic_socket_streambuf
  : public std::streambuf,
    private boost::base_from_member<io_service>,
    public basic_socket<Protocol, StreamSocketService>
{
public:
  /// The endpoint type.
  typedef typename Protocol::endpoint endpoint_type;

  /// The time type.
  typedef typename TimeTraits::time_type time_type;

  /// The duration type.
  typedef typename TimeTraits::duration_type duration_type;

  /// Construct a basic_socket_streambuf without establishing a connection.
  basic_socket_streambuf()
    : basic_socket<Protocol, StreamSocketService>(
        boost::base_from_member<boost::asio::io_service>::member),
      unbuffered_(false),
      timer_service_(0),
      timer_state_(no_timer)
  {
    init_buffers();
  }

  /// Destructor flushes buffered data.
  virtual ~basic_socket_streambuf()
  {
    if (pptr() != pbase())
      overflow(traits_type::eof());

    destroy_timer();
  }

  /// Establish a connection.
  /**
   * This function establishes a connection to the specified endpoint.
   *
   * @return \c this if a connection was successfully established, a null
   * pointer otherwise.
   */
  basic_socket_streambuf<Protocol, StreamSocketService,
    Time, TimeTraits, TimerService>* connect(
      const endpoint_type& endpoint)
  {
    init_buffers();

    this->basic_socket<Protocol, StreamSocketService>::close(ec_);

    if (timer_state_ == timer_has_expired)
    {
      ec_ = boost::asio::error::operation_aborted;
      return 0;
    }

    io_handler handler = { this };
    this->basic_socket<Protocol, StreamSocketService>::async_connect(
        endpoint, handler);

    ec_ = boost::asio::error::would_block;
    this->get_service().get_io_service().reset();
    do this->get_service().get_io_service().run_one();
    while (ec_ == boost::asio::error::would_block);

    return !ec_ ? this : 0;
  }

#if defined(GENERATING_DOCUMENTATION)
  /// Establish a connection.
  /**
   * This function automatically establishes a connection based on the supplied
   * resolver query parameters. The arguments are used to construct a resolver
   * query object.
   *
   * @return \c this if a connection was successfully established, a null
   * pointer otherwise.
   */
  template <typename T1, ..., typename TN>
  basic_socket_streambuf<Protocol, StreamSocketService>* connect(
      T1 t1, ..., TN tn);
#elif defined(BOOST_ASIO_HAS_VARIADIC_TEMPLATES)
  template <typename... T>
  basic_socket_streambuf<Protocol, StreamSocketService,
    Time, TimeTraits, TimerService>* connect(T... x)
  {
    init_buffers();
    this->basic_socket<Protocol, StreamSocketService>::close(ec_);
    typedef typename Protocol::resolver resolver_type;
    typedef typename resolver_type::query resolver_query;
    resolver_query query(x...);
    resolve_and_connect(query);
    return !ec_ ? this : 0;
  }
#else
  BOOST_PP_REPEAT_FROM_TO(
      1, BOOST_PP_INC(BOOST_ASIO_SOCKET_STREAMBUF_MAX_ARITY),
      BOOST_ASIO_PRIVATE_CONNECT_DEF, _ )
#endif

  /// Close the connection.
  /**
   * @return \c this if a connection was successfully established, a null
   * pointer otherwise.
   */
  basic_socket_streambuf<Protocol, StreamSocketService,
    Time, TimeTraits, TimerService>* close()
  {
    sync();
    this->basic_socket<Protocol, StreamSocketService>::close(ec_);
    if (!ec_)
      init_buffers();
    return !ec_ ? this : 0;
  }

  /// Get the last error associated with the stream buffer.
  /**
   * @return An \c error_code corresponding to the last error from the stream
   * buffer.
   */
  const boost::system::error_code& puberror() const
  {
    return error();
  }

  /// Get the stream buffer's expiry time as an absolute time.
  /**
   * @return An absolute time value representing the stream buffer's expiry
   * time.
   */
  time_type expires_at() const
  {
    return timer_service_
      ? timer_service_->expires_at(timer_implementation_)
      : time_type();
  }

  /// Set the stream buffer's expiry time as an absolute time.
  /**
   * This function sets the expiry time associated with the stream. Stream
   * operations performed after this time (where the operations cannot be
   * completed using the internal buffers) will fail with the error
   * boost::asio::error::operation_aborted.
   *
   * @param expiry_time The expiry time to be used for the stream.
   */
  void expires_at(const time_type& expiry_time)
  {
    construct_timer();

    boost::system::error_code ec;
    timer_service_->expires_at(timer_implementation_, expiry_time, ec);
    boost::asio::detail::throw_error(ec, "expires_at");

    start_timer();
  }

  /// Get the stream buffer's expiry time relative to now.
  /**
   * @return A relative time value representing the stream buffer's expiry time.
   */
  duration_type expires_from_now() const
  {
    return TimeTraits::subtract(expires_at(), TimeTraits::now());
  }

  /// Set the stream buffer's expiry time relative to now.
  /**
   * This function sets the expiry time associated with the stream. Stream
   * operations performed after this time (where the operations cannot be
   * completed using the internal buffers) will fail with the error
   * boost::asio::error::operation_aborted.
   *
   * @param expiry_time The expiry time to be used for the timer.
   */
  void expires_from_now(const duration_type& expiry_time)
  {
    construct_timer();

    boost::system::error_code ec;
    timer_service_->expires_from_now(timer_implementation_, expiry_time, ec);
    boost::asio::detail::throw_error(ec, "expires_from_now");

    start_timer();
  }

protected:
  int_type underflow()
  {
    if (gptr() == egptr())
    {
      if (timer_state_ == timer_has_expired)
      {
        ec_ = boost::asio::error::operation_aborted;
        return traits_type::eof();
      }

      io_handler handler = { this };
      this->get_service().async_receive(this->get_implementation(),
          boost::asio::buffer(boost::asio::buffer(get_buffer_) + putback_max),
          0, handler);

      ec_ = boost::asio::error::would_block;
      this->get_service().get_io_service().reset();
      do this->get_service().get_io_service().run_one();
      while (ec_ == boost::asio::error::would_block);
      if (ec_)
        return traits_type::eof();

      setg(&get_buffer_[0], &get_buffer_[0] + putback_max,
          &get_buffer_[0] + putback_max + bytes_transferred_);
      return traits_type::to_int_type(*gptr());
    }
    else
    {
      return traits_type::eof();
    }
  }

  int_type overflow(int_type c)
  {
    if (unbuffered_)
    {
      if (traits_type::eq_int_type(c, traits_type::eof()))
      {
        // Nothing to do.
        return traits_type::not_eof(c);
      }
      else
      {
        if (timer_state_ == timer_has_expired)
        {
          ec_ = boost::asio::error::operation_aborted;
          return traits_type::eof();
        }

        // Send the single character immediately.
        char_type ch = traits_type::to_char_type(c);
        io_handler handler = { this };
        this->get_service().async_send(this->get_implementation(),
            boost::asio::buffer(&ch, sizeof(char_type)), 0, handler);

        ec_ = boost::asio::error::would_block;
        this->get_service().get_io_service().reset();
        do this->get_service().get_io_service().run_one();
        while (ec_ == boost::asio::error::would_block);
        if (ec_)
          return traits_type::eof();

        return c;
      }
    }
    else
    {
      // Send all data in the output buffer.
      boost::asio::const_buffer buffer =
        boost::asio::buffer(pbase(), pptr() - pbase());
      while (boost::asio::buffer_size(buffer) > 0)
      {
        if (timer_state_ == timer_has_expired)
        {
          ec_ = boost::asio::error::operation_aborted;
          return traits_type::eof();
        }

        io_handler handler = { this };
        this->get_service().async_send(this->get_implementation(),
            boost::asio::buffer(buffer), 0, handler);

        ec_ = boost::asio::error::would_block;
        this->get_service().get_io_service().reset();
        do this->get_service().get_io_service().run_one();
        while (ec_ == boost::asio::error::would_block);
        if (ec_)
          return traits_type::eof();

        buffer = buffer + bytes_transferred_;
      }
      setp(&put_buffer_[0], &put_buffer_[0] + put_buffer_.size());

      // If the new character is eof then our work here is done.
      if (traits_type::eq_int_type(c, traits_type::eof()))
        return traits_type::not_eof(c);

      // Add the new character to the output buffer.
      *pptr() = traits_type::to_char_type(c);
      pbump(1);
      return c;
    }
  }

  int sync()
  {
    return overflow(traits_type::eof());
  }

  std::streambuf* setbuf(char_type* s, std::streamsize n)
  {
    if (pptr() == pbase() && s == 0 && n == 0)
    {
      unbuffered_ = true;
      setp(0, 0);
      return this;
    }

    return 0;
  }

  /// Get the last error associated with the stream buffer.
  /**
   * @return An \c error_code corresponding to the last error from the stream
   * buffer.
   */
  virtual const boost::system::error_code& error() const
  {
    return ec_;
  }

private:
  void init_buffers()
  {
    setg(&get_buffer_[0],
        &get_buffer_[0] + putback_max,
        &get_buffer_[0] + putback_max);
    if (unbuffered_)
      setp(0, 0);
    else
      setp(&put_buffer_[0], &put_buffer_[0] + put_buffer_.size());
  }

  template <typename ResolverQuery>
  void resolve_and_connect(const ResolverQuery& query)
  {
    typedef typename Protocol::resolver resolver_type;
    typedef typename resolver_type::iterator iterator_type;
    resolver_type resolver(
        boost::base_from_member<boost::asio::io_service>::member);
    iterator_type i = resolver.resolve(query, ec_);
    if (!ec_)
    {
      iterator_type end;
      ec_ = boost::asio::error::host_not_found;
      while (ec_ && i != end)
      {
        this->basic_socket<Protocol, StreamSocketService>::close(ec_);

        if (timer_state_ == timer_has_expired)
        {
          ec_ = boost::asio::error::operation_aborted;
          return;
        }

        io_handler handler = { this };
        this->basic_socket<Protocol, StreamSocketService>::async_connect(
            *i, handler);

        ec_ = boost::asio::error::would_block;
        this->get_service().get_io_service().reset();
        do this->get_service().get_io_service().run_one();
        while (ec_ == boost::asio::error::would_block);

        ++i;
      }
    }
  }

  struct io_handler;
  friend struct io_handler;
  struct io_handler
  {
    basic_socket_streambuf* this_;

    void operator()(const boost::system::error_code& ec,
        std::size_t bytes_transferred = 0)
    {
      this_->ec_ = ec;
      this_->bytes_transferred_ = bytes_transferred;
    }
  };

  struct timer_handler;
  friend struct timer_handler;
  struct timer_handler
  {
    basic_socket_streambuf* this_;

    void operator()(const boost::system::error_code&)
    {
      time_type now = TimeTraits::now();

      time_type expiry_time = this_->timer_service_->expires_at(
            this_->timer_implementation_);

      if (TimeTraits::less_than(now, expiry_time))
      {
        this_->timer_state_ = timer_is_pending;
        this_->timer_service_->async_wait(this_->timer_implementation_, *this);
      }
      else
      {
        this_->timer_state_ = timer_has_expired;
        boost::system::error_code ec;
        this_->basic_socket<Protocol, StreamSocketService>::close(ec);
      }
    }
  };

  void construct_timer()
  {
    if (timer_service_ == 0)
    {
      TimerService& timer_service = use_service<TimerService>(
          boost::base_from_member<boost::asio::io_service>::member);
      timer_service.construct(timer_implementation_);
      timer_service_ = &timer_service;
    }
  }

  void destroy_timer()
  {
    if (timer_service_)
      timer_service_->destroy(timer_implementation_);
  }

  void start_timer()
  {
    if (timer_state_ != timer_is_pending)
    {
      timer_handler handler = { this };
      handler(boost::system::error_code());
    }
  }

  enum { putback_max = 8 };
  enum { buffer_size = 512 };
  boost::asio::detail::array<char, buffer_size> get_buffer_;
  boost::asio::detail::array<char, buffer_size> put_buffer_;
  bool unbuffered_;
  boost::system::error_code ec_;
  std::size_t bytes_transferred_;
  TimerService* timer_service_;
  typename TimerService::implementation_type timer_implementation_;
  enum state { no_timer, timer_is_pending, timer_has_expired } timer_state_;
};

} // namespace asio
} // namespace boost

#include <boost/asio/detail/pop_options.hpp>

#if !defined(BOOST_ASIO_HAS_VARIADIC_TEMPLATES)
# undef BOOST_ASIO_PRIVATE_CONNECT_DEF
#endif // !defined(BOOST_ASIO_HAS_VARIADIC_TEMPLATES)

#endif // !defined(BOOST_NO_IOSTREAM)

#endif // BOOST_ASIO_BASIC_SOCKET_STREAMBUF_HPP