Skip to content

Commit

Permalink
Implement datachannel close
Browse files Browse the repository at this point in the history
Sets SCTP_RESET_STREAMS on the socket to close
the datachannel.
  • Loading branch information
mo3rfan committed Apr 6, 2017
1 parent 9eb15e4 commit 6f98e0e
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 3 deletions.
1 change: 1 addition & 0 deletions include/rtcdcpp/DataChannel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace rtcdcpp {
#define PPID_BINARY_EMPTY 57

// DataChannel Control Types
#define DC_TYPE_CLOSE 0x04
#define DC_TYPE_OPEN 0x03
#define DC_TYPE_ACK 0x02

Expand Down
3 changes: 3 additions & 0 deletions include/rtcdcpp/PeerConnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,13 @@ class PeerConnection {

// DataChannel message parsing
void HandleNewDataChannel(ChunkPtr chunk, uint16_t sid);
void HandleDataChannelClose(uint16_t sid);
void HandleStringMessage(ChunkPtr chunk, uint16_t sid);
void HandleBinaryMessage(ChunkPtr chunk, uint16_t sid);

std::shared_ptr<Logger> logger = GetLogger("rtcdcpp.PeerConnection");
public:
void ResetSCTPStream(uint16_t stream_id);

};
}
1 change: 1 addition & 0 deletions include/rtcdcpp/SCTPWrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class SCTPWrapper {
bool Initialize();
void Start();
void Stop();
void ResetSCTPStream(uint16_t stream_id);
// int GetStreamCursor();
// void SetStreamCursor(int i);

Expand Down
8 changes: 5 additions & 3 deletions src/DataChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ DataChannel::DataChannel(PeerConnection *pc, uint16_t stream_id, uint8_t chan_ty
error_cb = [](std::string x) { ; };
}

DataChannel::~DataChannel() { ; }
DataChannel::~DataChannel() { DataChannel::Close(); }

uint16_t DataChannel::GetStreamID() { return this->stream_id; }

Expand All @@ -57,9 +57,11 @@ std::string DataChannel::GetLabel() { return this->label; }
std::string DataChannel::GetProtocol() { return this->protocol; }

/**
* TODO: Close the DataChannel.
* Close the DataChannel.
*/
void Close() { ; }
void Close() {
this->pc->ResetSCTPStream(GetStreamID());
}

bool DataChannel::SendString(std::string msg) {
std::cerr << "DC: Sending string: " << msg << std::endl;
Expand Down
17 changes: 17 additions & 0 deletions src/PeerConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ void PeerConnection::OnSCTPMsgReceived(ChunkPtr chunk, uint16_t sid, uint32_t pp
} else if (chunk->Data()[0] == DC_TYPE_ACK) {
SPDLOG_TRACE(logger, "DC ACK");
// HandleDataChannelAck(chunk, sid); XXX: Don't care right now
} else if (chunk->Data()[0] == DC_TYPE_CLOSE) {
SPDLOG_TRACE(logger, "DC CLOSE");
HandleDataChannelClose(sid);
} else {
SPDLOG_TRACE(logger, "Unknown msg_type for ppid control: {}", chunk->Data()[0]);
}
Expand Down Expand Up @@ -240,6 +243,15 @@ void PeerConnection::HandleNewDataChannel(ChunkPtr chunk, uint16_t sid) {
}
}

void PeerConnection::HandleDataChannelClose(uint16_t sid) {
auto cur_channel = GetChannel(sid);
if (!cur_channel) {
logger->warn("Received close for unknown channel: {}", sid);
return;
}
cur_channel->OnClosed();
}

void PeerConnection::HandleStringMessage(ChunkPtr chunk, uint16_t sid) {
auto cur_channel = GetChannel(sid);
if (!cur_channel) {
Expand Down Expand Up @@ -271,4 +283,9 @@ void PeerConnection::SendBinaryMsg(const uint8_t *data, int len, uint16_t sid) {
auto cur_msg = std::make_shared<Chunk>(data, len);
this->sctp->GSForSCTP(cur_msg, sid, PPID_BINARY);
}

void PeerConnection::ResetSCTPStream(uint16_t stream_id) {
this->sctp->ResetSCTPStream(stream_id);
}

}
52 changes: 52 additions & 0 deletions src/SCTPWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,42 @@ void SCTPWrapper::OnNotification(union sctp_notification *notify, size_t len) {
break;
case SCTP_STREAM_RESET_EVENT:
SPDLOG_TRACE(logger, "OnNotification(type=SCTP_STREAM_RESET_EVENT)");
struct sctp_stream_reset_event reset_event;
reset_event = notify->sn_strreset_event;
for (int i = 1; i < 2; i++) {
uint16_t streamid = reset_event.strreset_stream_list[i];
uint16_t set_flags;
if (reset_event.strreset_flags != 0) {
if ((reset_event.strreset_flags ^ SCTP_STREAM_RESET_INCOMING_SSN) == 0) {
set_flags = SCTP_STREAM_RESET_INCOMING;
}
if ((reset_event.strreset_flags ^ SCTP_STREAM_RESET_OUTGOING_SSN) == 0) {
//fires when we close the stream from our side explicity or
//as a result of remote close or some error.

logger->info("Outgoing stream_id#{} have been reset, calling onClose CB", streamid);
const uint8_t dc_close_data = DC_TYPE_CLOSE;
const uint8_t *dc_close_ptr = &dc_close_data;
OnMsgReceived(dc_close_ptr, sizeof(dc_close_ptr), streamid + 1, PPID_CONTROL);
//The above signals to call our onClose callback
}
if ((reset_event.strreset_flags ^ SCTP_STREAM_RESET_DENIED) == 0) {
logger->error("Stream reset denied by peer");
}
if ((reset_event.strreset_flags ^ SCTP_STREAM_RESET_FAILED) == 0) {
logger->error("Stream reset failed");
}
} else {
continue;
}
if (set_flags == SCTP_STREAM_RESET_INCOMING) {
// Reset the stream when a remote close is received.
logger->info("SCTP Reset received for stream_id#{} from remote", streamid);
ResetSCTPStream(streamid + 1, set_flags);
// This will cause another event SCTP_STREAM_RESET_OUTGOING_SSN
// where we can finally call our callbacks.
}
}
break;
case SCTP_ASSOC_RESET_EVENT:
SPDLOG_TRACE(logger, "OnNotification(type=SCTP_ASSOC_RESET_EVENT)");
Expand Down Expand Up @@ -291,6 +327,22 @@ void SCTPWrapper::Stop() {
usrsctp_close(sock);
sock = nullptr;
}
usrsctp_deregister_address(this);
}

void SCTPWrapper::ResetSCTPStream(uint16_t stream_id, uint16_t srs_flags) {
struct sctp_reset_streams stream_close;
size_t no_of_streams = 1;
size_t len = sizeof(sctp_assoc_t) + (2 + no_of_streams) * sizeof(uint16_t);
memset(&stream_close, 0, len);
stream_close.srs_flags = srs_flags;
stream_close.srs_number_streams = no_of_streams;
stream_close.srs_stream_list[0] = stream_id;
if (usrsctp_setsockopt(this->sock, IPPROTO_SCTP, SCTP_RESET_STREAMS, &stream_close, (socklen_t)len) == -1) {
logger->error("Could not set socket options for SCTP_RESET_STREAMS. errno={}", errno);
} else {
logger->info("SCTP_RESET_STREAMS socket option has been set successfully");
}
}

void SCTPWrapper::DTLSForSCTP(ChunkPtr chunk) { this->recv_queue.push(chunk); }
Expand Down

0 comments on commit 6f98e0e

Please sign in to comment.