複数のDataStreamを利用する
SkyWay Linux®︎ SDK (以下、Linux SDK ) のクイックスタートでは、複数のDataStreamをPublish、Subscribeできるように実装されていません。
本ドキュメントのように実装することで複数のDataStreamを扱うことができます。
注意 本ドキュメントはクイックスタートをベースとしたカスタマイズ方法を提示しています。
詳しい実装についてはクイックスタートを確認してください。
Publish
クイックスタートのサンプルは1つのDataStreamをPublishする実装となっています。
そのため、複数のStreamを扱うためにExampleRoom
クラスの DataStream を管理するメンバーを以下に書き換えます。
std::vector<std::shared_ptr<skyway::core::stream::local::LocalDataStream>> data_streams_;
// DataStreamをPublishします。
というコメントに続く DataStream の送信処理を以下の内容に書き換えます。
const int data_stream_num = 2 for (int i = 0; i < data_stream_num; i++) { auto stream = skyway::media::StreamFactory::CreateDataStream(); skyway::room::interface::LocalRoomMember::PublicationOptions publication_options{}; auto publication = room_member_->Publish(stream, publication_options); if (publication) { std::cout << "- DataStream Published" << std::endl; std::cout << " - Publication Id: " << publication->Id() << std::endl; } data_streams_.emplace_back(stream); auto data_thread = std::make_unique<std::thread>( [this, stream, i] { while (!is_leaving_) { int random = rand() % 10 + 1; // 1〜10のランダムな整数 auto msg = "send msg: " + std::to_string(random); stream->Write(msg); std::cout << "- DataStream[" << i << "] Message Send: " << msg << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); } }); threads_.emplace_back(std::move(data_thread)); }
このサンプルでは1〜10までのランダムな整数のデータを送信します。
Subscribe
Subscribeした複数の DataStream に Listener を登録する際は、それぞれのStreamに別々のListenerの実体を登録する必要があります。
クイックスタートのサンプルはExampleRoom
自身を DataStream の Listener としているため、Listener の実体がひとつしかなく複数の DataStream に登録することができません。
それぞれの DataStream に Listener を用意するため、Listener 部分を専用のクラスに分離します。
まずは、以下のクラスを作成してください。
// data_stream_listener.hpp #include <skyway/core/stream/remote/data_stream.hpp> #include <iostream> class DataStreamListener : public skyway::core::stream::remote::RemoteDataStream::Listener { public: DataStreamListener() {} // Impl RemoteDataStream::Listener void OnData(const std::string& data) override { std::cout << "- [Event] DataStream Message Received: " << data << std::endl; } void OnDataBuffer(const uint8_t* data, size_t length) override { std::cout << "- [Event] DataStream BinaryData Received: Length=" << length << std::endl; } };
その後、example_room.hpp
と example_room.cpp
からリスナー実装を削除し、example_room.hpp
に以下のヘッダーを追加してください。
#include "data_stream_listener.hpp"
複数のSubscriptionを保持するために、メンバー変数を以下のように変更します。
std::vector<std::shared_ptr<DataStreamListener>> data_listeners_;
クイックスタートではExampleRoom
自身をリスナー登録していましたが、DataStreamListener
を生成して登録するように変更してください。
auto listener = std::make_shared<DataStreamListener>(); data_stream->AddListener(listener.get()); data_listeners_.emplace_back(listener);
このように変更することで、複数のDataStreamを同時にSubscribeできます。
コード全体
以下にカスタマイズ後のコード全体を提示します。
example_room.hpp
#include <vector> #include <skyway/context.hpp> #include <skyway/media/device_manager.hpp> #include <skyway/media/stream_factory.hpp> #include <skyway/media/v4l2_video_renderer.hpp> #include <skyway/room/p2p/p2p_room.hpp> #include "data_stream_listener.hpp" // Roomの操作を行うクラスです。 class ExampleRoom : public skyway::room::interface::Room::EventListener { public: ExampleRoom(const std::string& renderer_device_name); // SkyWayの利用を開始します。 bool Setup(const std::string& app_id, const std::string& secret_key); // P2PRoomを検索/作成し、入室します。 bool JoinRoom(const std::string& room_name); // Video/Audio/DataをPublishします。 void Publish(); // 指定のPublicationをSubscribeします。 bool Subscribe(std::shared_ptr<skyway::room::interface::RoomPublication> publication); // P2PRoomに存在するPublication全てに対してSubscribeを試みます。 void SubscribeAll(); // P2PRoomから退出します。 bool LeaveRoom(); // SkyWayの利用を終了します。 void Dispose(); // Impl skyway::room::interface::Room::EventListener void OnStreamPublished( std::shared_ptr<skyway::room::interface::RoomPublication> publication) override; private: // 指定のPublicationをSubscribeしているかチェックします。 bool IsSubscribed(std::shared_ptr<skyway::room::interface::RoomPublication> publication); std::shared_ptr<skyway::room::p2p::P2PRoom> p2proom_; std::shared_ptr<skyway::room::p2p::LocalP2PRoomMember> room_member_; std::unique_ptr<skyway::media::V4l2VideoRenderer> renderer_; std::vector<std::shared_ptr<skyway::core::stream::local::LocalDataStream>> data_streams_; std::vector<std::shared_ptr<DataStreamReceiver>> data_receivers_; std::string renderer_device_name_; std::vector<std::unique_ptr<std::thread>> threads_; std::atomic<bool> is_leaving_; };
example_room.cpp
Publish()
, Subscribe()
以外の処理に変更はないため省略します。
// Video/Audio/DataをPublishします。 void ExampleRoom::Publish() { // ビデオデバイスを列挙し、任意のデバイスをPublishします。 auto video_devices = skyway::media::DeviceManager::GetVideoDevices(); if (video_devices.size() > 0) { std::cout << "- VideoDevices" << std::endl; for (auto device : video_devices) { std::cout << " - Index: " << device.index << " Name: " << device.name << std::endl; } // デバイスのindex番号を入力します。 int device_index; std::cout << "- Enter the index of the video device to be published: "; std::cin >> device_index; if (device_index >= 0 && device_index < video_devices.size()) { auto video_stream = skyway::media::StreamFactory::CreateVideoStream(video_devices[device_index]); skyway::room::interface::LocalRoomMember::PublicationOptions publication_options {}; auto publication = room_member_->Publish(video_stream, publication_options); if (publication) { std::cout << " - VideoStream Published" << std::endl; std::cout << " - Publication Id: " << publication->Id() << std::endl; } } else { std::cout << " - Out of range" << std::endl; } } // デフォルトで設定されている音声入力デバイスをPublishします。 auto audio_devices = skyway::media::DeviceManager::GetRecordDevices(); if (audio_devices.size() > 0) { auto device = audio_devices[0]; skyway::media::DeviceManager::SetRecordingDevice(device); auto audio_stream = skyway::media::StreamFactory::CreateAudioStream(); skyway::room::interface::LocalRoomMember::PublicationOptions publication_options {}; auto publication = room_member_->Publish(audio_stream, publication_options); if (publication) { std::cout << "- AudioStream Published" << std::endl; std::cout << " - Device Index: " << device.index << std::endl; std::cout << " - Device Name: " << device.name << std::endl; std::cout << " - Publication Id: " << publication->Id() << std::endl; } } // DataStreamをPublishします。 const int data_stream_num = 2 for (int i = 0; i < data_stream_num; i++) { auto stream = skyway::media::StreamFactory::CreateDataStream(); skyway::room::interface::LocalRoomMember::PublicationOptions publication_options{}; auto publication = room_member_->Publish(stream, publication_options); if (publication) { std::cout << "- DataStream Published" << std::endl; std::cout << " - Publication Id: " << publication->Id() << std::endl; } data_streams_.emplace_back(stream); auto data_thread = std::make_unique<std::thread>( [this, stream, i] { while (!is_leaving_) { int random = rand() % 10 + 1; // 1〜10のランダムな整数 auto msg = "send msg: " + std::to_string(random); stream->Write(msg); std::cout << "- DataStream[" << i << "] Message Send: " << msg << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); } }); threads_.emplace_back(std::move(data_thread)); } } // 指定のPublicationをSubscribeします。 bool ExampleRoom::Subscribe(std::shared_ptr<skyway::room::interface::RoomPublication> publication) { if (room_member_->Id() == publication->Publisher()->Id()) { // 自身がPublishしたPublicationはSubscribeできないので無視します。 return false; } if (this->IsSubscribed(publication)) { // 既にSubscribeしている場合は無視します。 return false; } // PublicationのContentTypeに応じてメディアを出力します。 skyway::room::interface::LocalRoomMember::SubscriptionOptions subscription_options {}; if (publication->ContentType() == skyway::model::ContentType::kVideo) { if (renderer_device_name_ == "") { // 出力先を指定していない場合はSubscribeしません。 std::cout << "- VideoStream Subscribe Canceled" << std::endl; std::cout << " - Video device not specified" << std::endl; return false; } if (renderer_) { // 既に映像を出力している場合はSubscribeしません。 std::cout << "- VideoStream Subscribe Canceled" << std::endl; std::cout << " - Already set renderer" << std::endl; return false; } auto subscription = room_member_->Subscribe(publication->Id(), subscription_options); if (!subscription) { return false; } auto stream = std::dynamic_pointer_cast<skyway::core::stream::remote::RemoteVideoStream>( subscription->Stream()); // 映像を出力デバイスに書き込みます。 skyway::media::V4l2VideoRendererOptions monitor_opt; monitor_opt.scaled_width = 1920; monitor_opt.scaled_height = 1080; renderer_ = std::make_unique<skyway::media::V4l2VideoRenderer>(renderer_device_name_, monitor_opt); renderer_->Render(stream); std::cout << "- VideoStream Subscribed" << std::endl; std::cout << " - Device : " << renderer_device_name_ << std::endl; std::cout << " - Specified Width: " << monitor_opt.scaled_width << std::endl; std::cout << " - Specified Height: " << monitor_opt.scaled_height << std::endl; std::cout << " - Publication Id: " << publication->Id() << std::endl; std::cout << " - Subscription Id: " << subscription->Id() << std::endl; } else if (publication->ContentType() == skyway::model::ContentType::kAudio) { auto devices = skyway::media::DeviceManager::GetPlayoutDevices(); if (devices.size() > 0) { auto device = devices[0]; skyway::media::DeviceManager::SetPlayoutDevice(device); // 音声はSubscribeと同時に再生されます。 auto subscription = room_member_->Subscribe(publication->Id(), subscription_options); if (!subscription) { return false; } std::cout << "- AudioStream Subscribed" << std::endl; std::cout << " - Name: " << device.name << std::endl; std::cout << " - Publication Id: " << publication->Id() << std::endl; std::cout << " - Subscription Id: " << subscription->Id() << std::endl; } } else if (publication->ContentType() == skyway::model::ContentType::kData) { auto subscription = room_member_->Subscribe(publication->Id(), subscription_options); if (!subscription) { return false; } auto data_stream = std::dynamic_pointer_cast<skyway::core::stream::remote::RemoteDataStream>( subscription->Stream()); auto listener = std::make_shared<DataStreamListener>(); data_stream->AddListener(listener.get()); data_listeners_.emplace_back(listener); std::cout << "- DataStream Subscribed" << std::endl; std::cout << " - Publication Id: " << publication->Id() << std::endl; std::cout << " - Subscription Id: " << subscription->Id() << std::endl; } return true; }