---
lang: ja
path: cookbook/linux-sdk/multiple-data-stream
labels: クックブック/Linux SDK/複数のDataStreamを利用する
metaTitle: 複数のDataStreamを利用する | Linux SDK｜ クックブック ｜ SkyWay（スカイウェイ）
---

# 複数のDataStreamを利用する
SkyWay Linux®︎ SDK (以下、Linux SDK ) のクイックスタートでは、複数のDataStreamをPublish、Subscribeできるように実装されていません。

本ドキュメントのように実装することで複数のDataStreamを扱うことができます。

> **注意**
>本ドキュメントはクイックスタートをベースとしたカスタマイズ方法を提示しています。
>
>詳しい実装については[クイックスタート](https://skyway.ntt.com/ja/docs/user-guide/linux-sdk/quickstart/)を確認してください。

## Publish
クイックスタートのサンプルは1つのDataStreamをPublishする実装となっています。

そのため、複数のStreamを扱うために`ExampleRoom`クラスの DataStream を管理するメンバーを以下に書き換えます。

```cpp
std::vector<std::shared_ptr<skyway::core::stream::local::LocalDataStream>> data_streams_;
```

`// DataStreamをPublishします。` というコメントに続く DataStream の送信処理を以下の内容に書き換えます。
```cpp
const int data_stream_num = 2
for (int i = 0; i < data_stream_num; i++) {
        auto stream = skyway::media::StreamFactory::CreateDataStream();
        skyway::room::interface::LocalRoomMember::PublicationOptions publication_options{};
        auto publication = room_member_->Publish(stream, publication_options);
        if (publication) {
            std::cout << "- DataStream Published" << std::endl;
            std::cout << "  - Publication Id: " << publication->Id() << std::endl;
        }
        data_streams_.emplace_back(stream);

        auto data_thread = std::make_unique<std::thread>(
            [this, stream, i] {
                while (!is_leaving_) {
                    int random = rand() % 10 + 1; // 1〜10のランダムな整数
                    auto msg = "send msg: " + std::to_string(random);
                    stream->Write(msg);
                    std::cout << "- DataStream[" << i << "] Message Send: " << msg << std::endl;
                    std::this_thread::sleep_for(std::chrono::seconds(1));
                }
        });
        threads_.emplace_back(std::move(data_thread));
    }
```
このサンプルでは1〜10までのランダムな整数のデータを送信します。


## Subscribe
Subscribeした複数の DataStream に Listener を登録する際は、それぞれのStreamに別々のListenerの実体を登録する必要があります。
クイックスタートのサンプルは`ExampleRoom`自身を DataStream の Listener としているため、Listener の実体がひとつしかなく複数の DataStream に登録することができません。

それぞれの DataStream に Listener を用意するため、Listener 部分を専用のクラスに分離します。

まずは、以下のクラスを作成してください。
```cpp
// data_stream_listener.hpp

#include <skyway/core/stream/remote/data_stream.hpp>

#include <iostream>

class DataStreamListener : public skyway::core::stream::remote::RemoteDataStream::Listener {
public:
    DataStreamListener() {}

    // Impl RemoteDataStream::Listener
    void OnData(const std::string& data) override {
        std::cout << "- [Event] DataStream Message Received: " << data << std::endl;
    }
    void OnDataBuffer(const uint8_t* data, size_t length) override {
        std::cout << "- [Event] DataStream BinaryData Received: Length=" << length << std::endl;
    }
};
```
その後、`example_room.hpp` と `example_room.cpp`からリスナー実装を削除し、`example_room.hpp`に以下のincludeディレクティブを追加してください。
```cpp
#include "data_stream_listener.hpp"
```
複数のSubscriptionを保持するために、メンバー変数を以下のように変更します。
```cpp
std::vector<std::shared_ptr<DataStreamListener>> data_listeners_;
```

クイックスタートでは`ExampleRoom`自身をリスナー登録していましたが、`DataStreamListener` を生成して登録するように変更してください。
```cpp
auto listener = std::make_shared<DataStreamListener>();  
data_stream->AddListener(listener.get()); 
data_listeners_.emplace_back(listener);
```
このように変更することで、複数のDataStreamを同時にSubscribeできます。

## コード全体
以下にカスタマイズ後のコード全体を提示します。

example_room.hpp
```cpp
#include <vector>

#include <skyway/context.hpp>
#include <skyway/media/device_manager.hpp>
#include <skyway/media/stream_factory.hpp>
#include <skyway/media/v4l2_video_renderer.hpp>
#include <skyway/room/p2p/p2p_room.hpp>

#include "data_stream_listener.hpp"

// Roomの操作を行うクラスです。
class ExampleRoom : public skyway::room::interface::Room::EventListener {
public:
    ExampleRoom(const std::string& renderer_device_name);

    // SkyWayの利用を開始します。
    bool Setup(const std::string& app_id, const std::string& secret_key);

    // P2PRoomを検索/作成し、入室します。
    bool JoinRoom(const std::string& room_name);

    // Video/Audio/DataをPublishします。
    void Publish();

    // 指定のPublicationをSubscribeします。
    bool Subscribe(std::shared_ptr<skyway::room::interface::RoomPublication> publication);

    // P2PRoomに存在するPublication全てに対してSubscribeを試みます。
    void SubscribeAll();

    // P2PRoomから退出します。
    bool LeaveRoom();

    // SkyWayの利用を終了します。
    void Dispose();

    // Impl skyway::room::interface::Room::EventListener
    void OnStreamPublished(
        std::shared_ptr<skyway::room::interface::RoomPublication> publication) override;

private:
    // 指定のPublicationをSubscribeしているかチェックします。
    bool IsSubscribed(std::shared_ptr<skyway::room::interface::RoomPublication> publication);

    std::shared_ptr<skyway::room::p2p::P2PRoom> p2proom_;
    std::shared_ptr<skyway::room::p2p::LocalP2PRoomMember> room_member_;
    std::unique_ptr<skyway::media::V4l2VideoRenderer> renderer_;
    std::vector<std::shared_ptr<skyway::core::stream::local::LocalDataStream>> data_streams_;
    std::vector<std::shared_ptr<DataStreamReceiver>> data_receivers_;
    std::string renderer_device_name_;
    std::vector<std::unique_ptr<std::thread>> threads_;
    std::atomic<bool> is_leaving_;
};
```

example_room.cpp

`Publish()`, `Subscribe()`以外の処理に変更はないため省略します。
```cpp
// Video/Audio/DataをPublishします。
void ExampleRoom::Publish() {
    // ビデオデバイスを列挙し、任意のデバイスをPublishします。
    auto video_devices = skyway::media::DeviceManager::GetVideoDevices();
    if (video_devices.size() > 0) {
        std::cout << "- VideoDevices" << std::endl;
        for (auto device : video_devices) {
            std::cout << "  - Index: " << device.index << " Name: " << device.name << std::endl;
        }

        // デバイスのindex番号を入力します。
        int device_index;
        std::cout << "- Enter the index of the video device to be published: ";
        std::cin >> device_index;
        if (device_index >= 0 && device_index < video_devices.size()) {
            auto video_stream =
                skyway::media::StreamFactory::CreateVideoStream(video_devices[device_index]);
            skyway::room::interface::LocalRoomMember::PublicationOptions publication_options {};
            auto publication = room_member_->Publish(video_stream, publication_options);
            if (publication) {
                std::cout << "  - VideoStream Published" << std::endl;
                std::cout << "    - Publication Id: " << publication->Id() << std::endl;
            }
        } else {
            std::cout << "  - Out of range" << std::endl;
        }
    }

    // デフォルトで設定されている音声入力デバイスをPublishします。
    auto audio_devices = skyway::media::DeviceManager::GetRecordDevices();
    if (audio_devices.size() > 0) {
        auto device = audio_devices[0];
        skyway::media::DeviceManager::SetRecordingDevice(device);
        auto audio_stream = skyway::media::StreamFactory::CreateAudioStream();
        skyway::room::interface::LocalRoomMember::PublicationOptions publication_options {};
        auto publication = room_member_->Publish(audio_stream, publication_options);
        if (publication) {
            std::cout << "- AudioStream Published" << std::endl;
            std::cout << "  - Device Index: " << device.index << std::endl;
            std::cout << "  - Device Name: " << device.name << std::endl;
            std::cout << "  - Publication Id: " << publication->Id() << std::endl;
        }
    }

    // DataStreamをPublishします。
    const int data_stream_num = 2
    for (int i = 0; i < data_stream_num; i++) {
        auto stream = skyway::media::StreamFactory::CreateDataStream();
        skyway::room::interface::LocalRoomMember::PublicationOptions publication_options{};
        auto publication = room_member_->Publish(stream, publication_options);
        if (publication) {
            std::cout << "- DataStream Published" << std::endl;
            std::cout << "  - Publication Id: " << publication->Id() << std::endl;
        }
        data_streams_.emplace_back(stream);

        auto data_thread = std::make_unique<std::thread>(
            [this, stream, i] {
                while (!is_leaving_) {
                    int random = rand() % 10 + 1; // 1〜10のランダムな整数
                    auto msg = "send msg: " + std::to_string(random);
                    stream->Write(msg);
                    std::cout << "- DataStream[" << i << "] Message Send: " << msg << std::endl;
                    std::this_thread::sleep_for(std::chrono::seconds(1));
                }
        });
        threads_.emplace_back(std::move(data_thread));
    }
}

// 指定のPublicationをSubscribeします。
bool ExampleRoom::Subscribe(std::shared_ptr<skyway::room::interface::RoomPublication> publication) {
    if (room_member_->Id() == publication->Publisher()->Id()) {
        // 自身がPublishしたPublicationはSubscribeできないので無視します。
        return false;
    }
    if (this->IsSubscribed(publication)) {
        // 既にSubscribeしている場合は無視します。
        return false;
    }

    // PublicationのContentTypeに応じてメディアを出力します。
    skyway::room::interface::LocalRoomMember::SubscriptionOptions subscription_options {};
    if (publication->ContentType() == skyway::model::ContentType::kVideo) {
        if (renderer_device_name_ == "") {
            // 出力先を指定していない場合はSubscribeしません。
            std::cout << "- VideoStream Subscribe Canceled" << std::endl;
            std::cout << "  - Video device not specified" << std::endl;
            return false;
        }
        if (renderer_) {
            // 既に映像を出力している場合はSubscribeしません。
            std::cout << "- VideoStream Subscribe Canceled" << std::endl;
            std::cout << "  - Already set renderer" << std::endl;
            return false;
        }
        auto subscription = room_member_->Subscribe(publication->Id(), subscription_options);
        if (!subscription) {
            return false;
        }
        auto stream = std::dynamic_pointer_cast<skyway::core::stream::remote::RemoteVideoStream>(
            subscription->Stream());

        // 映像を出力デバイスに書き込みます。
        skyway::media::V4l2VideoRendererOptions monitor_opt;
        monitor_opt.scaled_width  = 1920;
        monitor_opt.scaled_height = 1080;
        renderer_ =
            std::make_unique<skyway::media::V4l2VideoRenderer>(renderer_device_name_, monitor_opt);
        renderer_->Render(stream);

        std::cout << "- VideoStream Subscribed" << std::endl;
        std::cout << "  - Device : " << renderer_device_name_ << std::endl;
        std::cout << "  - Specified Width: " << monitor_opt.scaled_width << std::endl;
        std::cout << "  - Specified Height: " << monitor_opt.scaled_height << std::endl;
        std::cout << "  - Publication Id: " << publication->Id() << std::endl;
        std::cout << "  - Subscription Id: " << subscription->Id() << std::endl;
    } else if (publication->ContentType() == skyway::model::ContentType::kAudio) {
        auto devices = skyway::media::DeviceManager::GetPlayoutDevices();
        if (devices.size() > 0) {
            auto device = devices[0];
            skyway::media::DeviceManager::SetPlayoutDevice(device);
            // 音声はSubscribeと同時に再生されます。
            auto subscription = room_member_->Subscribe(publication->Id(), subscription_options);
            if (!subscription) {
                return false;
            }
            std::cout << "- AudioStream Subscribed" << std::endl;
            std::cout << "  - Name: " << device.name << std::endl;
            std::cout << "  - Publication Id: " << publication->Id() << std::endl;
            std::cout << "  - Subscription Id: " << subscription->Id() << std::endl;
        }
    } else if (publication->ContentType() == skyway::model::ContentType::kData) {
        auto subscription = room_member_->Subscribe(publication->Id(), subscription_options);
        if (!subscription) {
            return false;
        }
        auto data_stream =
            std::dynamic_pointer_cast<skyway::core::stream::remote::RemoteDataStream>(
                subscription->Stream());
        auto listener = std::make_shared<DataStreamListener>();  
        data_stream->AddListener(listener.get()); 
        data_listeners_.emplace_back(listener);
        std::cout << "- DataStream Subscribed" << std::endl;
        std::cout << "  - Publication Id: " << publication->Id() << std::endl;
        std::cout << "  - Subscription Id: " << subscription->Id() << std::endl;
    }
    return true;
}
```
## 商標
Linux®︎は、米国およびその他の国における Linus Torvalds の登録商標です。
