複数のDataStreamを利用する
SkyWay Linux®︎ SDK (以下、Linux SDK ) のクイックスタートでは、複数のDataStreamをPublish、Subscribeできるように実装されていません。
本ドキュメントのように実装することで複数のDataStreamを扱うことができます。
注意 本ドキュメントはクイックスタートをベースとしたカスタマイズ方法を提示しています。
詳しい実装についてはクイックスタートを確認してください。
Publish
クイックスタートのサンプルは1つのDataStreamをPublishする実装となっています。
そのため、複数のStreamを扱うためにExampleRoomクラスの DataStream を管理するメンバーを以下に書き換えます。
std::vector<std::shared_ptr<skyway::core::stream::local::LocalDataStream>> data_streams_;// DataStreamをPublishします。 というコメントに続く DataStream の送信処理を以下の内容に書き換えます。
const int data_stream_num = 2
for (int i = 0; i < data_stream_num; i++) {
auto stream = skyway::media::StreamFactory::CreateDataStream();
skyway::room::interface::LocalRoomMember::PublicationOptions publication_options{};
auto publication = room_member_->Publish(stream, publication_options);
if (publication) {
std::cout << "- DataStream Published" << std::endl;
std::cout << " - Publication Id: " << publication->Id() << std::endl;
}
data_streams_.emplace_back(stream);
auto data_thread = std::make_unique<std::thread>(
[this, stream, i] {
while (!is_leaving_) {
int random = rand() % 10 + 1; // 1〜10のランダムな整数
auto msg = "send msg: " + std::to_string(random);
stream->Write(msg);
std::cout << "- DataStream[" << i << "] Message Send: " << msg << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
threads_.emplace_back(std::move(data_thread));
}このサンプルでは1〜10までのランダムな整数のデータを送信します。
Subscribe
Subscribeした複数の DataStream に Listener を登録する際は、それぞれのStreamに別々のListenerの実体を登録する必要があります。
クイックスタートのサンプルはExampleRoom自身を DataStream の Listener としているため、Listener の実体がひとつしかなく複数の DataStream に登録することができません。
それぞれの DataStream に Listener を用意するため、Listener 部分を専用のクラスに分離します。
まずは、以下のクラスを作成してください。
// data_stream_listener.hpp
#include <skyway/core/stream/remote/data_stream.hpp>
#include <iostream>
class DataStreamListener : public skyway::core::stream::remote::RemoteDataStream::Listener {
public:
DataStreamListener() {}
// Impl RemoteDataStream::Listener
void OnData(const std::string& data) override {
std::cout << "- [Event] DataStream Message Received: " << data << std::endl;
}
void OnDataBuffer(const uint8_t* data, size_t length) override {
std::cout << "- [Event] DataStream BinaryData Received: Length=" << length << std::endl;
}
};その後、example_room.hpp と example_room.cppからリスナー実装を削除し、example_room.hppに以下のincludeディレクティブを追加してください。
#include "data_stream_listener.hpp"複数のSubscriptionを保持するために、メンバー変数を以下のように変更します。
std::vector<std::shared_ptr<DataStreamListener>> data_listeners_;クイックスタートではExampleRoom自身をリスナー登録していましたが、DataStreamListener を生成して登録するように変更してください。
auto listener = std::make_shared<DataStreamListener>();
data_stream->AddListener(listener.get());
data_listeners_.emplace_back(listener);このように変更することで、複数のDataStreamを同時にSubscribeできます。
コード全体
以下にカスタマイズ後のコード全体を提示します。
example_room.hpp
#include <vector>
#include <skyway/context.hpp>
#include <skyway/media/device_manager.hpp>
#include <skyway/media/stream_factory.hpp>
#include <skyway/media/v4l2_video_renderer.hpp>
#include <skyway/room/p2p/p2p_room.hpp>
#include "data_stream_listener.hpp"
// Roomの操作を行うクラスです。
class ExampleRoom : public skyway::room::interface::Room::EventListener {
public:
ExampleRoom(const std::string& renderer_device_name);
// SkyWayの利用を開始します。
bool Setup(const std::string& app_id, const std::string& secret_key);
// P2PRoomを検索/作成し、入室します。
bool JoinRoom(const std::string& room_name);
// Video/Audio/DataをPublishします。
void Publish();
// 指定のPublicationをSubscribeします。
bool Subscribe(std::shared_ptr<skyway::room::interface::RoomPublication> publication);
// P2PRoomに存在するPublication全てに対してSubscribeを試みます。
void SubscribeAll();
// P2PRoomから退出します。
bool LeaveRoom();
// SkyWayの利用を終了します。
void Dispose();
// Impl skyway::room::interface::Room::EventListener
void OnStreamPublished(
std::shared_ptr<skyway::room::interface::RoomPublication> publication) override;
private:
// 指定のPublicationをSubscribeしているかチェックします。
bool IsSubscribed(std::shared_ptr<skyway::room::interface::RoomPublication> publication);
std::shared_ptr<skyway::room::p2p::P2PRoom> p2proom_;
std::shared_ptr<skyway::room::p2p::LocalP2PRoomMember> room_member_;
std::unique_ptr<skyway::media::V4l2VideoRenderer> renderer_;
std::vector<std::shared_ptr<skyway::core::stream::local::LocalDataStream>> data_streams_;
std::vector<std::shared_ptr<DataStreamReceiver>> data_receivers_;
std::string renderer_device_name_;
std::vector<std::unique_ptr<std::thread>> threads_;
std::atomic<bool> is_leaving_;
};example_room.cpp
Publish(), Subscribe()以外の処理に変更はないため省略します。
// Video/Audio/DataをPublishします。
void ExampleRoom::Publish() {
// ビデオデバイスを列挙し、任意のデバイスをPublishします。
auto video_devices = skyway::media::DeviceManager::GetVideoDevices();
if (video_devices.size() > 0) {
std::cout << "- VideoDevices" << std::endl;
for (auto device : video_devices) {
std::cout << " - Index: " << device.index << " Name: " << device.name << std::endl;
}
// デバイスのindex番号を入力します。
int device_index;
std::cout << "- Enter the index of the video device to be published: ";
std::cin >> device_index;
if (device_index >= 0 && device_index < video_devices.size()) {
auto video_stream =
skyway::media::StreamFactory::CreateVideoStream(video_devices[device_index]);
skyway::room::interface::LocalRoomMember::PublicationOptions publication_options {};
auto publication = room_member_->Publish(video_stream, publication_options);
if (publication) {
std::cout << " - VideoStream Published" << std::endl;
std::cout << " - Publication Id: " << publication->Id() << std::endl;
}
} else {
std::cout << " - Out of range" << std::endl;
}
}
// デフォルトで設定されている音声入力デバイスをPublishします。
auto audio_devices = skyway::media::DeviceManager::GetRecordDevices();
if (audio_devices.size() > 0) {
auto device = audio_devices[0];
skyway::media::DeviceManager::SetRecordingDevice(device);
auto audio_stream = skyway::media::StreamFactory::CreateAudioStream();
skyway::room::interface::LocalRoomMember::PublicationOptions publication_options {};
auto publication = room_member_->Publish(audio_stream, publication_options);
if (publication) {
std::cout << "- AudioStream Published" << std::endl;
std::cout << " - Device Index: " << device.index << std::endl;
std::cout << " - Device Name: " << device.name << std::endl;
std::cout << " - Publication Id: " << publication->Id() << std::endl;
}
}
// DataStreamをPublishします。
const int data_stream_num = 2
for (int i = 0; i < data_stream_num; i++) {
auto stream = skyway::media::StreamFactory::CreateDataStream();
skyway::room::interface::LocalRoomMember::PublicationOptions publication_options{};
auto publication = room_member_->Publish(stream, publication_options);
if (publication) {
std::cout << "- DataStream Published" << std::endl;
std::cout << " - Publication Id: " << publication->Id() << std::endl;
}
data_streams_.emplace_back(stream);
auto data_thread = std::make_unique<std::thread>(
[this, stream, i] {
while (!is_leaving_) {
int random = rand() % 10 + 1; // 1〜10のランダムな整数
auto msg = "send msg: " + std::to_string(random);
stream->Write(msg);
std::cout << "- DataStream[" << i << "] Message Send: " << msg << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
threads_.emplace_back(std::move(data_thread));
}
}
// 指定のPublicationをSubscribeします。
bool ExampleRoom::Subscribe(std::shared_ptr<skyway::room::interface::RoomPublication> publication) {
if (room_member_->Id() == publication->Publisher()->Id()) {
// 自身がPublishしたPublicationはSubscribeできないので無視します。
return false;
}
if (this->IsSubscribed(publication)) {
// 既にSubscribeしている場合は無視します。
return false;
}
// PublicationのContentTypeに応じてメディアを出力します。
skyway::room::interface::LocalRoomMember::SubscriptionOptions subscription_options {};
if (publication->ContentType() == skyway::model::ContentType::kVideo) {
if (renderer_device_name_ == "") {
// 出力先を指定していない場合はSubscribeしません。
std::cout << "- VideoStream Subscribe Canceled" << std::endl;
std::cout << " - Video device not specified" << std::endl;
return false;
}
if (renderer_) {
// 既に映像を出力している場合はSubscribeしません。
std::cout << "- VideoStream Subscribe Canceled" << std::endl;
std::cout << " - Already set renderer" << std::endl;
return false;
}
auto subscription = room_member_->Subscribe(publication->Id(), subscription_options);
if (!subscription) {
return false;
}
auto stream = std::dynamic_pointer_cast<skyway::core::stream::remote::RemoteVideoStream>(
subscription->Stream());
// 映像を出力デバイスに書き込みます。
skyway::media::V4l2VideoRendererOptions monitor_opt;
monitor_opt.scaled_width = 1920;
monitor_opt.scaled_height = 1080;
renderer_ =
std::make_unique<skyway::media::V4l2VideoRenderer>(renderer_device_name_, monitor_opt);
renderer_->Render(stream);
std::cout << "- VideoStream Subscribed" << std::endl;
std::cout << " - Device : " << renderer_device_name_ << std::endl;
std::cout << " - Specified Width: " << monitor_opt.scaled_width << std::endl;
std::cout << " - Specified Height: " << monitor_opt.scaled_height << std::endl;
std::cout << " - Publication Id: " << publication->Id() << std::endl;
std::cout << " - Subscription Id: " << subscription->Id() << std::endl;
} else if (publication->ContentType() == skyway::model::ContentType::kAudio) {
auto devices = skyway::media::DeviceManager::GetPlayoutDevices();
if (devices.size() > 0) {
auto device = devices[0];
skyway::media::DeviceManager::SetPlayoutDevice(device);
// 音声はSubscribeと同時に再生されます。
auto subscription = room_member_->Subscribe(publication->Id(), subscription_options);
if (!subscription) {
return false;
}
std::cout << "- AudioStream Subscribed" << std::endl;
std::cout << " - Name: " << device.name << std::endl;
std::cout << " - Publication Id: " << publication->Id() << std::endl;
std::cout << " - Subscription Id: " << subscription->Id() << std::endl;
}
} else if (publication->ContentType() == skyway::model::ContentType::kData) {
auto subscription = room_member_->Subscribe(publication->Id(), subscription_options);
if (!subscription) {
return false;
}
auto data_stream =
std::dynamic_pointer_cast<skyway::core::stream::remote::RemoteDataStream>(
subscription->Stream());
auto listener = std::make_shared<DataStreamListener>();
data_stream->AddListener(listener.get());
data_listeners_.emplace_back(listener);
std::cout << "- DataStream Subscribed" << std::endl;
std::cout << " - Publication Id: " << publication->Id() << std::endl;
std::cout << " - Subscription Id: " << subscription->Id() << std::endl;
}
return true;
}商標
Linux®︎は、米国およびその他の国における Linus Torvalds の登録商標です。