Skip to content
11 changes: 10 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ set(TRANTOR_SOURCES
trantor/net/inner/Connector.cc
trantor/net/inner/Poller.cc
trantor/net/inner/Socket.cc
trantor/net/inner/MemBufferNode.cc
trantor/net/inner/StreamBufferNode.cc
trantor/net/inner/AsyncStreamBufferNode.cc
trantor/net/inner/TcpConnectionImpl.cc
trantor/net/inner/Timer.cc
trantor/net/inner/TimerQueue.cc
Expand All @@ -126,11 +129,16 @@ if(WIN32)
set(TRANTOR_SOURCES
${TRANTOR_SOURCES}
third_party/wepoll/Wepoll.c
trantor/utils/WindowsSupport.cc)
trantor/utils/WindowsSupport.cc
trantor/net/inner/FileBufferNodeWin.cc)
set(private_headers
${private_headers}
third_party/wepoll/Wepoll.h
trantor/utils/WindowsSupport.h)
else(WIN32)
set(TRANTOR_SOURCES
${TRANTOR_SOURCES}
trantor/net/inner/FileBufferNodeUnix.cc)
endif(WIN32)

# Somehow the default value of TRANTOR_USE_TLS is OFF
Expand Down Expand Up @@ -283,6 +291,7 @@ set(public_net_headers
trantor/net/TcpClient.h
trantor/net/TcpConnection.h
trantor/net/TcpServer.h
trantor/net/AsyncStream.h
trantor/net/callbacks.h
trantor/net/Resolver.h
trantor/net/Channel.h
Expand Down
39 changes: 39 additions & 0 deletions trantor/net/AsyncStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
*
* @file AsyncStream.h
* @author An Tao
*
* Public header file in trantor lib.
*
* Copyright 2023, An Tao. All rights reserved.
* Use of this source code is governed by a BSD-style license
* that can be found in the License file.
*
*
*/

#pragma once

#include <trantor/utils/NonCopyable.h>
#include <memory>

namespace trantor
{
/**
* @brief This class represents a data stream that can be sent asynchronously.
* The data is sent in chunks, and the chunks are sent in order, and all the
* chunks are sent continuously.
*/
class TRANTOR_EXPORT AsyncStream : public NonCopyable
{
public:
virtual ~AsyncStream() = default;
virtual void send(const char *msg, size_t len) = 0;
void send(const std::string &msg)
{
send(msg.data(), msg.length());
}
virtual void close() = 0;
};
using AsyncStreamPtr = std::unique_ptr<AsyncStream>;
} // namespace trantor
8 changes: 8 additions & 0 deletions trantor/net/TcpConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <trantor/net/callbacks.h>
#include <trantor/net/Certificate.h>
#include <trantor/net/TLSPolicy.h>
#include <trantor/net/AsyncStream.h>
#include <memory>
#include <functional>
#include <string>
Expand Down Expand Up @@ -95,11 +96,18 @@ class TRANTOR_EXPORT TcpConnection
callback) = 0; // (buffer, buffer size) -> size
// of data put in buffer

/**
* @brief Send a stream to the peer asynchronously.
* @note The subsequent data sent after the async stream will be sent after
* the stream is closed.
*/
virtual AsyncStreamPtr sendAsyncStream() = 0;
/**
* @brief Get the local address of the connection.
*
* @return const InetAddress&
*/

virtual const InetAddress &localAddr() const = 0;

/**
Expand Down
65 changes: 65 additions & 0 deletions trantor/net/inner/AsyncStreamBufferNode.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include <trantor/net/inner/BufferNode.h>

namespace trantor
{
class AsyncBufferNode : public BufferNode
{
public:
AsyncBufferNode() = default;
~AsyncBufferNode() override = default;
bool isAsync() const override
{
return true;
}
bool isStream() const override
{
return true;
}
size_t remainingBytes() const override
{
if (msgBufferPtr_)
return msgBufferPtr_->readableBytes();
return 0;
}
bool available() const override
{
return !isDone_;
}
void getData(const char *&data, size_t &len) override
{
if (msgBufferPtr_)
{
data = msgBufferPtr_->peek();
len = msgBufferPtr_->readableBytes();
}
else
{
data = nullptr;
len = 0;
}
}
void retrieve(size_t len) override
{
assert(msgBufferPtr_);
if (msgBufferPtr_)
{
msgBufferPtr_->retrieve(len);
}
}
void append(const char *data, size_t len) override
{
if (!msgBufferPtr_)
{
msgBufferPtr_ = std::make_unique<MsgBuffer>(len);
}
msgBufferPtr_->append(data, len);
}

private:
std::unique_ptr<MsgBuffer> msgBufferPtr_;
};
BufferNodePtr BufferNode::newAsyncStreamBufferNode()
{
return std::make_shared<AsyncBufferNode>();
}
} // namespace trantor
86 changes: 86 additions & 0 deletions trantor/net/inner/BufferNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
*
* @file BufferNode.h
* @author An Tao
*
* Public header file in trantor lib.
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a BSD-style license
* that can be found in the License file.
*
*
*/

#pragma once
#ifdef _WIN32
#include <stdio.h>
#endif
#include <trantor/utils/MsgBuffer.h>
#include <trantor/utils/NonCopyable.h>
#include <trantor/utils/Logger.h>
#include <functional>
#include <memory>
#include <string>

namespace trantor
{
class BufferNode;
using BufferNodePtr = std::shared_ptr<BufferNode>;
using StreamCallback = std::function<std::size_t(char *, std::size_t)>;
class BufferNode : public NonCopyable
{
public:
virtual bool isFile() const
{
return false;
}
virtual ~BufferNode() = default;
virtual bool isStream() const
{
return false;
}
virtual void getData(const char *&data, size_t &len) = 0;
virtual void append(const char *, size_t)
{
LOG_FATAL << "Not a memory buffer node";
}
virtual void retrieve(size_t len) = 0;
virtual size_t remainingBytes() const = 0;
virtual int getFd() const
{
LOG_FATAL << "Not a file buffer node";
return -1;
}
virtual bool available() const
{
return true;
}
virtual bool isAsync() const
{
return false;
}

void done()
{
isDone_ = true;
}
static BufferNodePtr newMemBufferNode();

static BufferNodePtr newStreamBufferNode(StreamCallback &&cb);
#ifdef _WIN32
static BufferNodePtr newFileBufferNode(const wchar_t *fileName,
long long offset,
size_t length);
#else
static BufferNodePtr newFileBufferNode(const char *fileName,
off_t offset,
size_t length);
#endif
static BufferNodePtr newAsyncStreamBufferNode();

protected:
bool isDone_{false};
};

} // namespace trantor
Loading