diff --git a/modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_subscriber_fb_impl.h b/modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_subscriber_fb_impl.h index f6988db..04c61bd 100644 --- a/modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_subscriber_fb_impl.h +++ b/modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_subscriber_fb_impl.h @@ -86,6 +86,7 @@ class MqttSubscriberFbImpl final : public FunctionBlock std::mutex queueMutex; std::condition_variable queueCv; mutable std::recursive_mutex processingMutex; + std::mutex componentStatusMutex; DAQ_MQTT_STREAM_MODULE_API void onSignalsMessage(const mqtt::MqttAsyncClient& subscriber, const mqtt::MqttMessage& msg); diff --git a/modules/mqtt_streaming_module/src/mqtt_subscriber_fb_impl.cpp b/modules/mqtt_streaming_module/src/mqtt_subscriber_fb_impl.cpp index e389ed3..203d2c2 100644 --- a/modules/mqtt_streaming_module/src/mqtt_subscriber_fb_impl.cpp +++ b/modules/mqtt_streaming_module/src/mqtt_subscriber_fb_impl.cpp @@ -101,6 +101,8 @@ void MqttSubscriberFbImpl::updateStatuses() if (!statuses->isUpdated()) return; + std::scoped_lock lock(componentStatusMutex); + if (!jsonConfigErr.ok()) { setComponentStatusWithMessage(ComponentStatus::Error, jsonConfigErr.buildStatusMessage()); @@ -508,7 +510,6 @@ void MqttSubscriberFbImpl::processMessageImpl(const mqtt::MqttMessage& msg, cons decoderFb->processMessage(jsonObjStr, epochTime); } } - updateStatuses(); } void MqttSubscriberFbImpl::processingLoop() diff --git a/modules/mqtt_streaming_module/tests/test_mqtt_subscriber_fb.cpp b/modules/mqtt_streaming_module/tests/test_mqtt_subscriber_fb.cpp index 57a9306..ac27cf9 100644 --- a/modules/mqtt_streaming_module/tests/test_mqtt_subscriber_fb.cpp +++ b/modules/mqtt_streaming_module/tests/test_mqtt_subscriber_fb.cpp @@ -1,7 +1,6 @@ #include "mqtt_streaming_module/mqtt_subscriber_fb_impl.h" #include "test_daq_test_helper.h" #include "test_data.h" -#include #include #include #include @@ -72,6 +71,35 @@ class MqttSubscriberFbHelper fb->updateStatuses(); return dp; } + + bool waitStatusTheSame(size_t ms, daq::FunctionBlockPtr fb, EnumerationPtr status) { + helper::utils::Timer timer(ms); + auto getComponentStatus = [&fb]() { return fb.getStatusContainer().getStatus("ComponentStatus"); }; + bool result = true; + while (!timer.expired()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + result = (getComponentStatus() == status); + if (result == false) + break; + } + return result; + } + + bool waitStatusChange(size_t ms, daq::FunctionBlockPtr fb, EnumerationPtr status) { + helper::utils::Timer timer(ms); + auto getComponentStatus = [&fb]() { return fb.getStatusContainer().getStatus("ComponentStatus"); }; + bool result = false; + while (!timer.expired()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + result = (getComponentStatus() == status); + if (result == true) + break; + } + return result; + + } }; class MqttSubscriberFbTest : public testing::Test, public DaqTestHelper, public MqttSubscriberFbHelper @@ -185,8 +213,7 @@ TEST_F(MqttSubscriberFbTest, CreationWithDefaultConfig) daq::FunctionBlockPtr subFb; ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME)); EXPECT_EQ(subFb.getSignals(daq::search::Any()).getCount(), 0u); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()))); } TEST_F(MqttSubscriberFbTest, CreationWithPartialConfig) @@ -198,8 +225,7 @@ TEST_F(MqttSubscriberFbTest, CreationWithPartialConfig) config.addProperty(StringProperty(PROPERTY_NAME_SUB_TOPIC, String(buildTopicName()))); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); EXPECT_EQ(subFb.getSignals(daq::search::Any()).getCount(), 0u); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } TEST_F(MqttSubscriberFbTest, CreationWithCustomConfig) @@ -213,8 +239,7 @@ TEST_F(MqttSubscriberFbTest, CreationWithCustomConfig) config.setPropertyValue(PROPERTY_NAME_SUB_PREVIEW_SIGNAL_TS_MODE, static_cast(SDSM::SystemTime)); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); EXPECT_EQ(subFb.getSignals(daq::search::Any()).getCount(), 2u); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } TEST_F(MqttSubscriberFbTest, PreviewSignal) @@ -261,8 +286,7 @@ TEST_F(MqttSubscriberFbTest, SubscriptionStatusWaitingForData) config.setPropertyValue(PROPERTY_NAME_SUB_TOPIC, buildTopicName()); daq::FunctionBlockPtr subFb; ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } TEST_P(MqttSubscriberFbTopicPTest, CheckSubscriberFbTopic) @@ -279,8 +303,7 @@ TEST_P(MqttSubscriberFbTopicPTest, CheckSubscriberFbTopic) auto signals = fb.getSignals(); ASSERT_EQ(signals.getCount(), 1); const auto expectedComponentStatus = result ? "Warning" : "Error"; - EXPECT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", expectedComponentStatus, daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, fb, Enumeration("ComponentStatusType", expectedComponentStatus, daqInstance.getContext().getTypeManager()))); } INSTANTIATE_TEST_SUITE_P(TopicTest, @@ -323,16 +346,14 @@ TEST_F(MqttSubscriberFbTest, TwoFbCreation) auto config = PropertyObject(); config.addProperty(StringProperty(PROPERTY_NAME_SUB_TOPIC, buildTopicName("0"))); ASSERT_NO_THROW(fb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - EXPECT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, fb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } { daq::FunctionBlockPtr fb; auto config = PropertyObject(); config.addProperty(StringProperty(PROPERTY_NAME_SUB_TOPIC, buildTopicName("1"))); ASSERT_NO_THROW(fb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - EXPECT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, fb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } auto fbs = clientMqttFb.getFunctionBlocks(); ASSERT_EQ(fbs.getCount(), 2u); @@ -347,8 +368,7 @@ TEST_F(MqttSubscriberFbTest, PropertyChanged) auto topic = buildTopicName("0"); config.addProperty(StringProperty(PROPERTY_NAME_SUB_TOPIC, topic)); ASSERT_NO_THROW(fb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - EXPECT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, fb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); auto subFb = reinterpret_cast(*fb); ASSERT_EQ(topic, subFb->getSubscribedTopic()); @@ -365,8 +385,7 @@ TEST_F(MqttSubscriberFbTest, JsonInit0) config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG, String(VALID_JSON_1_TOPIC_0)); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); ASSERT_EQ(subFb.getFunctionBlocks().getCount(), 3u); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); auto lambda = [&](FunctionBlockPtr nestedFb, std::string value, std::string ts, std::string symbol) { EXPECT_EQ(nestedFb.getSignals()[0].getName().toStdString(), value); @@ -391,8 +410,7 @@ TEST_F(MqttSubscriberFbTest, JsonInit1) config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG, String(VALID_JSON_1_TOPIC_1)); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); ASSERT_EQ(subFb.getFunctionBlocks().getCount(), 3u); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); auto lambda = [&](FunctionBlockPtr nestedFb, std::string value, std::string ts, std::string symbol) { EXPECT_EQ(nestedFb.getSignals()[0].getName().toStdString(), value); @@ -418,8 +436,8 @@ TEST_P(MqttSubscriberFbConfigPTest, JsonWrongInit) config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG, String(configJson)); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); EXPECT_EQ(subFb.getFunctionBlocks().getCount(), 0u); - EXPECT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()))); + ASSERT_TRUE(waitStatusTheSame(1500, subFb, Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()))); EXPECT_EQ(subFb.getPropertyValue(PROPERTY_NAME_SUB_TOPIC).asPtr().toStdString(), ""); } @@ -441,8 +459,7 @@ TEST_P(MqttSubscriberFbConfigFilePTest, JsonInitFromFile) auto config = clientMqttFb.getAvailableFunctionBlockTypes().get(SUB_FB_NAME).createDefaultConfig(); config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG_FILE, String(configJson)); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } INSTANTIATE_TEST_SUITE_P(JsonConfigTest, @@ -459,8 +476,7 @@ TEST_F(MqttSubscriberFbTest, JsonInitFromFileWithChecking) auto config = clientMqttFb.getAvailableFunctionBlockTypes().get(SUB_FB_NAME).createDefaultConfig(); config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG_FILE, String("data/public-example0.json")); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); ASSERT_EQ(subFb.getFunctionBlocks().getCount(), 3u); auto lambda = [&](FunctionBlockPtr nestedFb, std::string value, std::string ts, std::string symbol) { @@ -486,8 +502,8 @@ TEST_F(MqttSubscriberFbTest, JsonInitFromFileWrongPath) config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG_FILE, String("/justWrongPath/wrongFile.txt")); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); EXPECT_EQ(subFb.getFunctionBlocks().getCount(), 0u); - EXPECT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()))); + ASSERT_TRUE(waitStatusTheSame(1500, subFb, Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()))); EXPECT_EQ(subFb.getPropertyValue(PROPERTY_NAME_SUB_TOPIC).asPtr().toStdString(), ""); } @@ -568,39 +584,39 @@ TEST_F(MqttSubscriberFbTest, WaitingData) auto rawFB = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_TRUE(waitStatusChange(1000, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); MqttAsyncClientWrapper publisher("testPublisherId"); ASSERT_TRUE(publisher.connect("127.0.0.1")); ASSERT_TRUE(publisher.publishMsg(mqtt::MqttMessage{topic, dataToSend[0], 1, 0})); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ASSERT_TRUE(waitStatusChange(1500, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received"), std::string::npos); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(waitStatusChange(600, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); ASSERT_TRUE(publisher.publishMsg(mqtt::MqttMessage{topic, dataToSend[1], 1, 0})); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_TRUE(waitStatusChange(1500, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received"), std::string::npos); ASSERT_TRUE(publisher.publishMsg(mqtt::MqttMessage{topic, dataToSend[2], 1, 0})); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_TRUE(waitStatusTheSame(150, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received"), std::string::npos); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + ASSERT_TRUE(waitStatusChange(600, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ASSERT_TRUE(waitStatusTheSame(600, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); rawFB.setPropertyValue(PROPERTY_NAME_SUB_DATA_TIMEOUT, 0); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_TRUE(waitStatusTheSame(600, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); ASSERT_TRUE(publisher.publishMsg(mqtt::MqttMessage{topic, dataToSend[3], 1, 0})); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ASSERT_TRUE(waitStatusChange(1500, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received"), std::string::npos); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(waitStatusTheSame(600, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received"), std::string::npos); } @@ -672,6 +688,7 @@ TEST_F(MqttSubscriberFbTest, CheckRawFbFullDataTransferWithReconfiguring) MqttAsyncClientWrapper publisher("testPublisherId"); ASSERT_TRUE(publisher.connect("127.0.0.1")); + ASSERT_TRUE(waitStatusChange(1000, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); mqtt::MqttMessage msg = {topic0, dataToSend[0], 2, 0}; @@ -695,14 +712,9 @@ TEST_F(MqttSubscriberFbTest, CheckRawFbFullDataTransferWithReconfiguring) } } }; - helper::utils::Timer tmr(1000, true); - - bool hasData = false; - while (tmr.expired() == false && hasData == false) - hasData = (rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received") != std::string::npos); - - EXPECT_TRUE(hasData); + EXPECT_TRUE(waitStatusChange(1000, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); + EXPECT_TRUE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received") != std::string::npos); readerLambda(); ASSERT_EQ(dataToReceive.size(), 1u); @@ -711,19 +723,14 @@ TEST_F(MqttSubscriberFbTest, CheckRawFbFullDataTransferWithReconfiguring) dataToReceive.clear(); ASSERT_NO_THROW(rawFB.setPropertyValue(PROPERTY_NAME_SUB_TOPIC, topic1)); - EXPECT_EQ(rawFB.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1000, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); msg = {topic1, dataToSend[1], 2, 0}; ASSERT_TRUE(publisher.publishMsg(msg)); - tmr.restart(); - - hasData = false; - while (tmr.expired() == false && hasData == false) - hasData = (rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received") != std::string::npos); - EXPECT_TRUE(hasData); + EXPECT_TRUE(waitStatusChange(1000, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); + EXPECT_TRUE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received") != std::string::npos); readerLambda(); ASSERT_EQ(dataToReceive.size(), 1u); @@ -742,44 +749,42 @@ TEST_F(MqttSubscriberFbTest, DomainDataPacketWithTheSameTS) config.setPropertyValue(PROPERTY_NAME_SUB_TOPIC, topic); config.setPropertyValue(PROPERTY_NAME_SUB_PREVIEW_SIGNAL, True); config.setPropertyValue(PROPERTY_NAME_SUB_PREVIEW_SIGNAL_TS_MODE, static_cast(SDSM::SystemTime)); + config.setPropertyValue(PROPERTY_NAME_SUB_DATA_TIMEOUT, 0); auto fb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config); auto getTime = []() { return duration_cast(system_clock::now().time_since_epoch()).count(); }; - auto getComponentStatus = [&]() { return fb.getStatusContainer().getStatus("ComponentStatus"); }; auto getStatusMsg = [&]() { return fb.getStatusContainer().getStatusMessage("ComponentStatus").toStdString(); }; + const auto warning = Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()); const auto ok = Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()); const auto ts = getTime(); auto packet = createDomainDataPacket(fb, ts); - - ASSERT_EQ(getComponentStatus(), ok); + ASSERT_TRUE(waitStatusTheSame(1500, fb, ok)); packet = createDomainDataPacket(fb, ts); - - EXPECT_EQ(getComponentStatus(), warning); + ASSERT_TRUE(waitStatusChange(1500, fb, warning)); EXPECT_NE(getStatusMsg().find(warnMsg), std::string::npos); packet = createDomainDataPacket(fb, getTime()); - - EXPECT_EQ(getComponentStatus(), warning); + ASSERT_TRUE(waitStatusTheSame(1500, fb, warning)); EXPECT_NE(getStatusMsg().find(warnMsg), std::string::npos); // reconfiguring should reset warning fb.setPropertyValue(PROPERTY_NAME_SUB_QOS, 2); - EXPECT_EQ(getComponentStatus(), warning); + ASSERT_TRUE(waitStatusTheSame(1500, fb, warning)); EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos); EXPECT_NE(getStatusMsg().find("Waiting for data"), std::string::npos); packet = createDomainDataPacket(fb, getTime()); - EXPECT_EQ(getComponentStatus(), ok); + ASSERT_TRUE(waitStatusChange(1500, fb, ok)); EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos); packet = createDomainDataPacket(fb, getTime()); - EXPECT_EQ(getComponentStatus(), ok); + ASSERT_TRUE(waitStatusTheSame(1500, fb, ok)); EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos); }