summaryrefslogtreecommitdiffstats
blob: 93259c1024da927d336e4e85cc9857de10c50b69 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#include "Swiften/LinkLocal/BonjourQuerier.h"

#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>

#include "Swiften/EventLoop/MainEventLoop.h"
#include "Swiften/LinkLocal/BonjourBrowseQuery.h"
#include "Swiften/LinkLocal/BonjourPublishQuery.h"
#include "Swiften/Base/foreach.h"

namespace Swift {

BonjourQuerier::BonjourQuerier() : stopRequested(false), thread(0) {
	int fds[2];
	int result = pipe(fds);
	assert(result == 0);
	interruptSelectReadSocket = fds[0];
	fcntl(interruptSelectReadSocket, F_SETFL, fcntl(interruptSelectReadSocket, F_GETFL)|O_NONBLOCK);
	interruptSelectWriteSocket = fds[1];
}

BonjourQuerier::~BonjourQuerier() {
	stop();
}

boost::shared_ptr<DNSSDBrowseQuery> BonjourQuerier::createBrowseQuery() {
	return boost::shared_ptr<DNSSDBrowseQuery>(new BonjourBrowseQuery(shared_from_this()));
}

boost::shared_ptr<DNSSDPublishQuery> BonjourQuerier::createPublishQuery(const String& name, int port, const LinkLocalServiceInfo& info) {
	return boost::shared_ptr<DNSSDPublishQuery>(new BonjourPublishQuery(name, port, info, shared_from_this()));
}

void BonjourQuerier::addRunningQuery(boost::shared_ptr<BonjourQuery> query) {
	{
		boost::lock_guard<boost::mutex> lock(runningQueriesMutex);
		runningQueries.push_back(query);
	}
	runningQueriesAvailableEvent.notify_one();
	interruptSelect();
}

void BonjourQuerier::removeRunningQuery(boost::shared_ptr<BonjourQuery> query) {
	{
		boost::lock_guard<boost::mutex> lock(runningQueriesMutex);
		runningQueries.erase(std::remove(
			runningQueries.begin(), runningQueries.end(), query), runningQueries.end());
	}
}

void BonjourQuerier::interruptSelect() {
	char c = 0;
	write(interruptSelectWriteSocket, &c, 1);
}

void BonjourQuerier::start() {
	stop();
	thread = new boost::thread(boost::bind(&BonjourQuerier::run, shared_from_this()));
}

void BonjourQuerier::stop() {
	if (thread) {
		stopRequested = true;
		runningQueries.clear(); // TODO: Is this the right thing to do?
		runningQueriesAvailableEvent.notify_one();
		interruptSelect();
		thread->join();
		delete thread;
		stopRequested = false;
	}
}

void BonjourQuerier::run() {
	while (!stopRequested) {
		fd_set fdSet;
		int maxSocket;
		{
			boost::unique_lock<boost::mutex> lock(runningQueriesMutex);
			if (runningQueries.empty()) {
				runningQueriesAvailableEvent.wait(lock);
				if (runningQueries.empty()) {
					continue;
				}
			}

			// Run all running queries
			FD_ZERO(&fdSet);
			maxSocket = interruptSelectReadSocket;
			FD_SET(interruptSelectReadSocket, &fdSet);

			foreach(const boost::shared_ptr<BonjourQuery>& query, runningQueries) {
				int socketID = query->getSocketID();
				maxSocket = std::max(maxSocket, socketID);
				FD_SET(socketID, &fdSet);
			}
		}

		if (select(maxSocket+1, &fdSet, NULL, NULL, 0) <= 0) {
			continue;
		}

		if (FD_ISSET(interruptSelectReadSocket, &fdSet)) {
			char dummy;
			while (read(interruptSelectReadSocket, &dummy, 1) > 0) {}
		}

		{
			boost::lock_guard<boost::mutex> lock(runningQueriesMutex);
			foreach(const boost::shared_ptr<BonjourQuery>& query, runningQueries) {
				if (FD_ISSET(query->getSocketID(), &fdSet)) {
					MainEventLoop::postEvent(
							boost::bind(&BonjourQuery::processResult, query), shared_from_this());
				}
			}
		}
	}
}

}