1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
#include "Swiften/LinkLocal/BonjourQuerier.h"
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include "Swiften/LinkLocal/BonjourBrowseQuery.h"
#include "Swiften/LinkLocal/BonjourPublishQuery.h"
#include "Swiften/Base/foreach.h"
namespace Swift {
BonjourQuerier::BonjourQuerier() : stopRequested(false), thread(0) {
int fds[2];
int result = pipe(fds);
assert(result == 0);
interruptSelectReadSocket = fds[0];
fcntl(interruptSelectReadSocket, F_SETFL, fcntl(interruptSelectReadSocket, F_GETFL)|O_NONBLOCK);
interruptSelectWriteSocket = fds[1];
}
BonjourQuerier::~BonjourQuerier() {
stop();
}
boost::shared_ptr<DNSSDBrowseQuery> BonjourQuerier::createBrowseQuery() {
return boost::shared_ptr<DNSSDBrowseQuery>(new BonjourBrowseQuery(shared_from_this()));
}
boost::shared_ptr<DNSSDPublishQuery> BonjourQuerier::createPublishQuery(const String& name, int port, const LinkLocalServiceInfo& info) {
return boost::shared_ptr<DNSSDPublishQuery>(new BonjourPublishQuery(name, port, info, shared_from_this()));
}
void BonjourQuerier::addRunningQuery(boost::shared_ptr<BonjourQuery> query) {
{
boost::lock_guard<boost::mutex> lock(runningQueriesMutex);
runningQueries.push_back(query);
}
runningQueriesAvailableEvent.notify_one();
interruptSelect();
}
void BonjourQuerier::removeRunningQuery(boost::shared_ptr<BonjourQuery> query) {
{
boost::lock_guard<boost::mutex> lock(runningQueriesMutex);
runningQueries.erase(std::remove(
runningQueries.begin(), runningQueries.end(), query), runningQueries.end());
}
}
void BonjourQuerier::interruptSelect() {
char c = 0;
write(interruptSelectWriteSocket, &c, 1);
}
void BonjourQuerier::start() {
stop();
thread = new boost::thread(boost::bind(&BonjourQuerier::run, shared_from_this()));
}
void BonjourQuerier::stop() {
if (thread) {
stopRequested = true;
runningQueries.clear(); // TODO: Is this the right thing to do?
runningQueriesAvailableEvent.notify_one();
interruptSelect();
thread->join();
delete thread;
stopRequested = false;
}
}
void BonjourQuerier::run() {
while (!stopRequested) {
fd_set fdSet;
int maxSocket;
{
boost::unique_lock<boost::mutex> lock(runningQueriesMutex);
if (runningQueries.empty()) {
runningQueriesAvailableEvent.wait(lock);
if (runningQueries.empty()) {
continue;
}
}
// Run all running queries
FD_ZERO(&fdSet);
maxSocket = interruptSelectReadSocket;
FD_SET(interruptSelectReadSocket, &fdSet);
foreach(const boost::shared_ptr<BonjourQuery>& query, runningQueries) {
int socketID = query->getSocketID();
maxSocket = std::max(maxSocket, socketID);
FD_SET(socketID, &fdSet);
}
}
if (select(maxSocket+1, &fdSet, NULL, NULL, 0) <= 0) {
continue;
}
if (FD_ISSET(interruptSelectReadSocket, &fdSet)) {
char dummy;
while (read(interruptSelectReadSocket, &dummy, 1) > 0) {}
}
{
boost::lock_guard<boost::mutex> lock(runningQueriesMutex);
foreach(boost::shared_ptr<BonjourQuery> query, runningQueries) {
if (FD_ISSET(query->getSocketID(), &fdSet)) {
query->processResult();
}
}
}
}
}
}
|