/* * Copyright (c) 2010-2018 Isode Limited. * All rights reserved. * See the COPYING file for more information. */ #include #include #include #include #include #include #include #include #include #include namespace Swift { BonjourQuerier::BonjourQuerier(EventLoop* eventLoop) : eventLoop(eventLoop), stopRequested(false), thread(nullptr) { int fds[2]; int result = pipe(fds); assert(result == 0); (void) result; interruptSelectReadSocket = fds[0]; fcntl(interruptSelectReadSocket, F_SETFL, fcntl(interruptSelectReadSocket, F_GETFL)|O_NONBLOCK); interruptSelectWriteSocket = fds[1]; } BonjourQuerier::~BonjourQuerier() { assert(!thread); } std::shared_ptr BonjourQuerier::createBrowseQuery() { return std::make_shared(shared_from_this(), eventLoop); } std::shared_ptr BonjourQuerier::createRegisterQuery(const std::string& name, unsigned short port, const ByteArray& info) { return std::make_shared(name, port, info, shared_from_this(), eventLoop); } std::shared_ptr BonjourQuerier::createResolveServiceQuery(const DNSSDServiceID& service) { return std::make_shared(service, shared_from_this(), eventLoop); } std::shared_ptr BonjourQuerier::createResolveHostnameQuery(const std::string& hostname, int interfaceIndex) { return std::make_shared(hostname, interfaceIndex, shared_from_this(), eventLoop); } void BonjourQuerier::addRunningQuery(std::shared_ptr query) { { std::lock_guard lock(runningQueriesMutex); runningQueries.push_back(query); } runningQueriesAvailableEvent.notify_one(); interruptSelect(); } void BonjourQuerier::removeRunningQuery(std::shared_ptr query) { { std::lock_guard lock(runningQueriesMutex); erase(runningQueries, query); } } void BonjourQuerier::interruptSelect() { char c = 0; write(interruptSelectWriteSocket, &c, 1); } void BonjourQuerier::start() { assert(!thread); thread = new std::thread(boost::bind(&BonjourQuerier::run, shared_from_this())); } void BonjourQuerier::stop() { if (thread) { stopRequested = true; assert(runningQueries.empty()); runningQueriesAvailableEvent.notify_one(); interruptSelect(); thread->join(); delete thread; thread = nullptr; stopRequested = false; } } void BonjourQuerier::run() { while (!stopRequested) { fd_set fdSet; int maxSocket; { std::unique_lock lock(runningQueriesMutex); if (runningQueries.empty()) { runningQueriesAvailableEvent.wait(lock); if (runningQueries.empty()) { continue; } } // Run all running queries FD_ZERO(&fdSet); maxSocket = interruptSelectReadSocket; FD_SET(interruptSelectReadSocket, &fdSet); for (const auto& query : runningQueries) { int socketID = query->getSocketID(); maxSocket = std::max(maxSocket, socketID); FD_SET(socketID, &fdSet); } } if (select(maxSocket+1, &fdSet, nullptr, nullptr, nullptr) <= 0) { continue; } if (FD_ISSET(interruptSelectReadSocket, &fdSet)) { char dummy; while (read(interruptSelectReadSocket, &dummy, 1) > 0) {} } { std::lock_guard lock(runningQueriesMutex); for (auto&& query : runningQueries) { if (FD_ISSET(query->getSocketID(), &fdSet)) { query->processResult(); } } } } } }