From dfdf7ced8e3b496e37c3dd2757688bafaaf12a34 Mon Sep 17 00:00:00 2001 From: Stephen DeRosa Date: Mon, 1 Jun 2026 22:23:15 -0600 Subject: [PATCH] Integration tests: 15kb rpc payloads (rpc v2). update rust sdk hash. --- client-sdk-rust | 2 +- src/tests/integration/test_rpc.cpp | 230 +++++++++++++++++++---------- 2 files changed, 153 insertions(+), 79 deletions(-) diff --git a/client-sdk-rust b/client-sdk-rust index d74892de..a0c91f5a 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit d74892de57724bd08dc2307b68e4251b8eae2f33 +Subproject commit a0c91f5ae2309f3911c4a5d5843f385e6f20846a diff --git a/src/tests/integration/test_rpc.cpp b/src/tests/integration/test_rpc.cpp index cde9ee00..1abac063 100644 --- a/src/tests/integration/test_rpc.cpp +++ b/src/tests/integration/test_rpc.cpp @@ -33,8 +33,9 @@ namespace livekit::test { using namespace std::chrono_literals; -// Maximum RPC payload size (15KB) -constexpr size_t kMaxRpcPayloadSize = 15 * 1024; +// RPC v1 packet payload limit was ~15 KiB; these tests validate that RPC v2 can exceed it. +constexpr size_t kRpcV1PayloadLimit = 15 * 1024; +constexpr size_t kLargeRpcPayloadSize = 20 * 1000; // Test configuration from environment variables struct RpcTestConfig { @@ -90,6 +91,14 @@ std::string generateRandomPayload(size_t size) { return result.substr(0, size); } +size_t checksumPayload(const std::string& payload) { + size_t checksum = 0; + for (char c : payload) { + checksum += static_cast(c); + } + return checksum; +} + class RpcIntegrationTest : public ::testing::Test { protected: void SetUp() override { @@ -104,31 +113,36 @@ class RpcIntegrationTest : public ::testing::Test { // Test basic RPC round-trip TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; - } + EXPECT_TRUE(config_.available) << "Missing integration configuration"; // Create receiver room auto receiver_room = std::make_unique(); RoomOptions receiver_options; receiver_options.auto_subscribe = true; - bool receiver_connected = receiver_room->connect(config_.url, config_.token_b, receiver_options); - ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, receiver_options)) << "Receiver failed to connect"; std::string receiver_identity = lockLocalParticipant(*receiver_room)->identity(); // Register RPC handler on receiver - returns size and checksum instead of // full payload std::atomic rpc_calls_received{0}; + std::string caller_identity; + std::string observed_request_id; + std::string observed_caller_identity; + double observed_response_timeout = 0.0; + std::mutex observed_mutex; lockLocalParticipant(*receiver_room) - ->registerRpcMethod("echo", [&rpc_calls_received](const RpcInvocationData& data) -> std::optional { + ->registerRpcMethod("echo", [&](const RpcInvocationData& data) -> std::optional { rpc_calls_received++; - size_t checksum = 0; - for (char c : data.payload) { - checksum += static_cast(c); + { + const std::scoped_lock lock(observed_mutex); + observed_request_id = data.request_id; + observed_caller_identity = data.caller_identity; + observed_response_timeout = data.response_timeout_sec; } - return "echo:" + std::to_string(data.payload.size()) + ":" + std::to_string(checksum); + EXPECT_EQ(data.payload, "hello world"); + return "echo:" + std::to_string(data.payload.size()) + ":" + std::to_string(checksumPayload(data.payload)); }); // Create caller room @@ -136,8 +150,8 @@ TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { RoomOptions caller_options; caller_options.auto_subscribe = true; - bool caller_connected = caller_room->connect(config_.url, config_.token_a, caller_options); - ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + ASSERT_TRUE(caller_room->connect(config_.url, config_.token_a, caller_options)) << "Caller failed to connect"; + caller_identity = lockLocalParticipant(*caller_room)->identity(); // Wait for receiver to be visible to caller bool receiver_visible = waitForParticipant(caller_room.get(), receiver_identity, 10s); @@ -148,14 +162,17 @@ TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { std::string response = lockLocalParticipant(*caller_room)->performRpc(receiver_identity, "echo", test_payload, 10.0); // Verify response contains correct size and checksum - size_t expected_checksum = 0; - for (char c : test_payload) { - expected_checksum += static_cast(c); - } std::string expected_response = - "echo:" + std::to_string(test_payload.size()) + ":" + std::to_string(expected_checksum); + "echo:" + std::to_string(test_payload.size()) + ":" + std::to_string(checksumPayload(test_payload)); EXPECT_EQ(response, expected_response); EXPECT_EQ(rpc_calls_received.load(), 1); + { + const std::scoped_lock lock(observed_mutex); + EXPECT_FALSE(observed_request_id.empty()); + EXPECT_EQ(observed_caller_identity, caller_identity); + EXPECT_GT(observed_response_timeout, 0.0); + EXPECT_LE(observed_response_timeout, 10.0); + } // Cleanup lockLocalParticipant(*receiver_room)->unregisterRpcMethod("echo"); @@ -163,58 +180,132 @@ TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { receiver_room.reset(); } -// Test maximum payload size (15KB) -TEST_F(RpcIntegrationTest, MaxPayloadSize) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; - } +TEST_F(RpcIntegrationTest, LargeRequestPayloadRoundTrip) { + EXPECT_TRUE(config_.available) << "Missing integration configuration"; + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, options)) << "Receiver failed to connect"; + + std::string receiver_identity = lockLocalParticipant(*receiver_room)->identity(); + + std::atomic received_payload_size{0}; + std::atomic received_payload_checksum{0}; + lockLocalParticipant(*receiver_room) + ->registerRpcMethod("large-request", [&](const RpcInvocationData& data) -> std::optional { + received_payload_size = data.payload.size(); + received_payload_checksum = checksumPayload(data.payload); + return std::to_string(data.payload.size()) + ":" + std::to_string(checksumPayload(data.payload)); + }); + + auto caller_room = std::make_unique(); + ASSERT_TRUE(caller_room->connect(config_.url, config_.token_a, options)) << "Caller failed to connect"; + + bool receiver_visible = waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + std::string max_payload = generateRandomPayload(kLargeRpcPayloadSize); + ASSERT_GT(max_payload.size(), kRpcV1PayloadLimit); + std::string response = + lockLocalParticipant(*caller_room)->performRpc(receiver_identity, "large-request", max_payload, 30.0); + + EXPECT_EQ(response, std::to_string(kLargeRpcPayloadSize) + ":" + std::to_string(checksumPayload(max_payload))); + EXPECT_EQ(received_payload_size.load(), kLargeRpcPayloadSize); + EXPECT_EQ(received_payload_checksum.load(), checksumPayload(max_payload)); + + lockLocalParticipant(*receiver_room)->unregisterRpcMethod("large-request"); + caller_room.reset(); + receiver_room.reset(); +} + +TEST_F(RpcIntegrationTest, LargeResponsePayloadRoundTrip) { + EXPECT_TRUE(config_.available) << "Missing integration configuration"; auto receiver_room = std::make_unique(); RoomOptions options; options.auto_subscribe = true; - bool receiver_connected = receiver_room->connect(config_.url, config_.token_b, options); - ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, options)) << "Receiver failed to connect"; std::string receiver_identity = lockLocalParticipant(*receiver_room)->identity(); + const std::string large_response = generateRandomPayload(kLargeRpcPayloadSize); + ASSERT_GT(large_response.size(), kRpcV1PayloadLimit); - // Register handler that echoes payload size lockLocalParticipant(*receiver_room) - ->registerRpcMethod("payload-size", [](const RpcInvocationData& data) -> std::optional { - return std::to_string(data.payload.size()); + ->registerRpcMethod("large-response", [&](const RpcInvocationData& data) -> std::optional { + EXPECT_EQ(data.payload, "send-large-response"); + return large_response; }); auto caller_room = std::make_unique(); - bool caller_connected = caller_room->connect(config_.url, config_.token_a, options); - ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + ASSERT_TRUE(caller_room->connect(config_.url, config_.token_a, options)) << "Caller failed to connect"; bool receiver_visible = waitForParticipant(caller_room.get(), receiver_identity, 10s); ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; - // Test with max payload size (15KB) - std::string max_payload = generateRandomPayload(kMaxRpcPayloadSize); std::string response = - lockLocalParticipant(*caller_room)->performRpc(receiver_identity, "payload-size", max_payload, 30.0); + lockLocalParticipant(*caller_room)->performRpc(receiver_identity, "large-response", "send-large-response", 30.0); - EXPECT_EQ(response, std::to_string(kMaxRpcPayloadSize)); + EXPECT_EQ(response.size(), large_response.size()); + EXPECT_EQ(checksumPayload(response), checksumPayload(large_response)); + EXPECT_EQ(response, large_response); - lockLocalParticipant(*receiver_room)->unregisterRpcMethod("payload-size"); + lockLocalParticipant(*receiver_room)->unregisterRpcMethod("large-response"); + caller_room.reset(); + receiver_room.reset(); +} + +TEST_F(RpcIntegrationTest, LargeRequestAndResponseRoundTrip) { + EXPECT_TRUE(config_.available) << "Missing integration configuration"; + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, options)) << "Receiver failed to connect"; + + std::string receiver_identity = lockLocalParticipant(*receiver_room)->identity(); + const std::string large_response = generateRandomPayload(kLargeRpcPayloadSize + 123); + ASSERT_GT(large_response.size(), kRpcV1PayloadLimit); + + lockLocalParticipant(*receiver_room) + ->registerRpcMethod("large-both", [&](const RpcInvocationData& data) -> std::optional { + EXPECT_GT(data.payload.size(), kRpcV1PayloadLimit); + return large_response; + }); + + auto caller_room = std::make_unique(); + ASSERT_TRUE(caller_room->connect(config_.url, config_.token_a, options)) << "Caller failed to connect"; + + bool receiver_visible = waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + const std::string large_request = generateRandomPayload(kLargeRpcPayloadSize); + ASSERT_GT(large_request.size(), kRpcV1PayloadLimit); + + std::string response = + lockLocalParticipant(*caller_room)->performRpc(receiver_identity, "large-both", large_request, 30.0); + + EXPECT_EQ(response.size(), large_response.size()); + EXPECT_EQ(checksumPayload(response), checksumPayload(large_response)); + EXPECT_EQ(response, large_response); + + lockLocalParticipant(*receiver_room)->unregisterRpcMethod("large-both"); caller_room.reset(); receiver_room.reset(); } // Test RPC timeout TEST_F(RpcIntegrationTest, RpcTimeout) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; - } + EXPECT_TRUE(config_.available) << "Missing integration configuration"; auto receiver_room = std::make_unique(); RoomOptions options; options.auto_subscribe = true; - bool receiver_connected = receiver_room->connect(config_.url, config_.token_b, options); - ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, options)) << "Receiver failed to connect"; std::string receiver_identity = lockLocalParticipant(*receiver_room)->identity(); @@ -226,8 +317,7 @@ TEST_F(RpcIntegrationTest, RpcTimeout) { }); auto caller_room = std::make_unique(); - bool caller_connected = caller_room->connect(config_.url, config_.token_a, options); - ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + ASSERT_TRUE(caller_room->connect(config_.url, config_.token_a, options)) << "Caller failed to connect"; bool receiver_visible = waitForParticipant(caller_room.get(), receiver_identity, 10s); ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; @@ -243,22 +333,18 @@ TEST_F(RpcIntegrationTest, RpcTimeout) { // Test RPC with unsupported method TEST_F(RpcIntegrationTest, UnsupportedMethod) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; - } + EXPECT_TRUE(config_.available) << "Missing integration configuration"; auto receiver_room = std::make_unique(); RoomOptions options; options.auto_subscribe = true; - bool receiver_connected = receiver_room->connect(config_.url, config_.token_b, options); - ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, options)) << "Receiver failed to connect"; std::string receiver_identity = lockLocalParticipant(*receiver_room)->identity(); auto caller_room = std::make_unique(); - bool caller_connected = caller_room->connect(config_.url, config_.token_a, options); - ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + ASSERT_TRUE(caller_room->connect(config_.url, config_.token_a, options)) << "Caller failed to connect"; bool receiver_visible = waitForParticipant(caller_room.get(), receiver_identity, 10s); ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; @@ -277,16 +363,13 @@ TEST_F(RpcIntegrationTest, UnsupportedMethod) { // Test RPC with application error TEST_F(RpcIntegrationTest, ApplicationError) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; - } + EXPECT_TRUE(config_.available) << "Missing integration configuration"; auto receiver_room = std::make_unique(); RoomOptions options; options.auto_subscribe = true; - bool receiver_connected = receiver_room->connect(config_.url, config_.token_b, options); - ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, options)) << "Receiver failed to connect"; std::string receiver_identity = lockLocalParticipant(*receiver_room)->identity(); @@ -297,8 +380,7 @@ TEST_F(RpcIntegrationTest, ApplicationError) { }); auto caller_room = std::make_unique(); - bool caller_connected = caller_room->connect(config_.url, config_.token_a, options); - ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + ASSERT_TRUE(caller_room->connect(config_.url, config_.token_a, options)) << "Caller failed to connect"; bool receiver_visible = waitForParticipant(caller_room.get(), receiver_identity, 10s); ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; @@ -317,31 +399,26 @@ TEST_F(RpcIntegrationTest, ApplicationError) { // Test multiple concurrent RPC calls TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; - } + EXPECT_TRUE(config_.available) << "Missing integration configuration"; auto receiver_room = std::make_unique(); RoomOptions options; options.auto_subscribe = true; - bool receiver_connected = receiver_room->connect(config_.url, config_.token_b, options); - ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, options)) << "Receiver failed to connect"; std::string receiver_identity = lockLocalParticipant(*receiver_room)->identity(); std::atomic calls_processed{0}; lockLocalParticipant(*receiver_room) ->registerRpcMethod("counter", [&calls_processed](const RpcInvocationData& data) -> std::optional { - int id = std::stoi(data.payload); calls_processed++; std::this_thread::sleep_for(100ms); // Simulate some work - return std::to_string(id * 2); + return data.payload + ":handled"; }); auto caller_room = std::make_unique(); - bool caller_connected = caller_room->connect(config_.url, config_.token_a, options); - ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + ASSERT_TRUE(caller_room->connect(config_.url, config_.token_a, options)) << "Caller failed to connect"; bool receiver_visible = waitForParticipant(caller_room.get(), receiver_identity, 10s); ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; @@ -350,7 +427,7 @@ TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { // it cannot expire mid-call while RPCs are in flight. ASSERT_NO_THROW(lockLocalParticipant(*caller_room)); - const int num_concurrent_calls = 10; + const int num_concurrent_calls = 5; std::vector threads; std::atomic successful_calls{0}; @@ -359,9 +436,10 @@ TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { try { auto caller_lp = lockLocalParticipant(*caller_room); ASSERT_NE(caller_lp, nullptr); - std::string response = caller_lp->performRpc(receiver_identity, "counter", std::to_string(i), 30.0); - int expected = i * 2; - if (std::stoi(response) == expected) { + const std::string payload = + "call-" + std::to_string(i) + ":" + std::string(256 + i, static_cast('a' + i)); + std::string response = caller_lp->performRpc(receiver_identity, "counter", payload, 30.0); + if (response == payload + ":handled") { successful_calls++; } } catch (const std::exception& e) { @@ -384,16 +462,13 @@ TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { // Integration test: Run for approximately 1 minute TEST_F(RpcIntegrationTest, OneMinuteIntegration) { - if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; - } + EXPECT_TRUE(config_.available) << "Missing integration configuration"; auto receiver_room = std::make_unique(); RoomOptions options; options.auto_subscribe = true; - bool receiver_connected = receiver_room->connect(config_.url, config_.token_b, options); - ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, options)) << "Receiver failed to connect"; std::string receiver_identity = lockLocalParticipant(*receiver_room)->identity(); @@ -408,8 +483,7 @@ TEST_F(RpcIntegrationTest, OneMinuteIntegration) { }); auto caller_room = std::make_unique(); - bool caller_connected = caller_room->connect(config_.url, config_.token_a, options); - ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + ASSERT_TRUE(caller_room->connect(config_.url, config_.token_a, options)) << "Caller failed to connect"; bool receiver_visible = waitForParticipant(caller_room.get(), receiver_identity, 10s); ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; @@ -429,7 +503,7 @@ TEST_F(RpcIntegrationTest, OneMinuteIntegration) { // Sender thread std::thread sender([&]() { - std::vector payload_sizes = {100, 1024, 5 * 1024, 10 * 1024, kMaxRpcPayloadSize}; + std::vector payload_sizes = {100, 1024, 5 * 1024, 10 * 1024, kRpcV1PayloadLimit}; int size_index = 0; while (running.load()) {