URY playd
C++ minimalist audio player
io.cpp
Go to the documentation of this file.
1 // This file is part of playd.
2 // playd is licensed under the MIT licence: see LICENSE.txt.
3 
17 #include <algorithm>
18 #include <cassert>
19 #include <csignal>
20 #include <cstring>
21 #include <sstream>
22 #include <string>
23 
24 // If UNICODE is defined on Windows, it'll select the wide-char gai_strerror.
25 // We don't want this.
26 #undef UNICODE
27 // Use the same ssize_t as libmpg123 on Windows.
28 #ifndef _MSC_VER
29 typedef long ssize_t;
30 #define _SSIZE_T_
31 #define _SSIZE_T_DEFINED
32 #endif
33 #include <uv.h>
34 
35 #include "errors.hpp"
36 #include "messages.h"
37 #include "player.hpp"
38 #include "response.hpp"
39 
40 #include "io.hpp"
41 
42 const std::uint16_t IoCore::PLAYER_UPDATE_PERIOD = 5; // ms
43 
44 //
45 // libuv callbacks
46 //
47 // These should generally trampoline back into class methods.
48 //
49 
51 void UvAlloc(uv_handle_t *, size_t suggested_size, uv_buf_t *buf)
52 {
53  // Since we used new here, we need to use delete when we finish reading.
54  *buf = uv_buf_init(new char[suggested_size](), suggested_size);
55 }
56 
58 void UvCloseCallback(uv_handle_t *handle)
59 {
60  assert(handle != nullptr);
61  delete handle;
62 }
63 
65 void UvReadCallback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
66 {
67  assert(stream != nullptr);
68 
69  auto *tcp = static_cast<Connection *>(stream->data);
70  assert(tcp != nullptr);
71 
72  // NB: Read delete[]s buf->base, so we don't.
73  tcp->Read(nread, buf);
74 
75  // We don't delete the handle.
76  // It will be used for future reads on this client!
77 }
78 
80 void UvListenCallback(uv_stream_t *server, int status)
81 {
82  assert(server != nullptr);
83  if (status < 0) return;
84 
85  auto *io = static_cast<IoCore *>(server->data);
86  assert(io != nullptr);
87 
88  io->Accept(server);
89 }
90 
92 void UvWriteCallback(uv_write_t *req, int status)
93 {
94  assert(req != nullptr);
95 
96  if (status) {
97  Debug() << "UvRespondCallback: got status:" << status
98  << std::endl;
99  }
100 
101  // We receive the write buffer as the write_t's data pointer.
102  // This is because something has to delete[] it,
103  // and we drew the short straw.
104  auto *buf = static_cast<char *>(req->data);
105  assert(buf != nullptr);
106 
107  delete[] buf;
108 
109  // This handle was created specifically for this shutdown.
110  // We thus have to delete it.
111  delete req;
112 }
113 
115 void UvUpdateTimerCallback(uv_timer_t *handle)
116 {
117  assert(handle != nullptr);
118 
119  auto *io = static_cast<IoCore *>(handle->data);
120  assert(io != nullptr);
121 
122  io->UpdatePlayer();
123 
124  // We don't delete the handle.
125  // It is being used for other timer fires.
126 }
127 
129 void UvSigintCallback(uv_signal_t *handle, int signum)
130 {
131  assert(handle != nullptr);
132 
133  auto *player = static_cast<Player *>(handle->data);
134  assert(player != nullptr);
135 
136  if (signum != SIGINT) return;
137 
138  Debug() << "Caught SIGINT, closing..." << std::endl;
139  player->Quit(Response::NOREQUEST);
140 
141  // We don't delete the handle.
142  // It is being used for other signals.
143 }
144 
146 void UvShutdownCallback(uv_shutdown_t *handle, int status)
147 {
148  assert(handle != nullptr);
149 
150  if (status) {
151  Debug() << "UvShutdownCallback: got status:" << status
152  << std::endl;
153  }
154 
155  auto *conn = static_cast<Connection *>(handle->data);
156  assert(conn != nullptr);
157 
158  // Now actually tell the client to die off.
159  conn->Depool();
160 
161  // This handle was created specifically for this shutdown.
162  // We thus have to delete it.
163  delete handle;
164 }
165 
166 //
167 // IoCore
168 //
169 
170 IoCore::IoCore(Player &player) : loop(nullptr), player(player)
171 {
172 }
173 
174 void IoCore::Run(const std::string &host, const std::string &port)
175 {
176  this->loop = uv_default_loop();
177  if (this->loop == nullptr) throw InternalError(MSG_IO_CANNOT_ALLOC);
178 
179  this->InitAcceptor(host, port);
180  this->InitSignals();
181  this->InitUpdateTimer();
182 
183  uv_run(this->loop, UV_RUN_DEFAULT);
184 
185  // We presume all of the open handles have been closed in Shutdown().
186  // We need only close the loop.
187  uv_loop_close(this->loop);
188 }
189 
190 void IoCore::Accept(uv_stream_t *server)
191 {
192  assert(server != nullptr);
193  assert(this->loop != nullptr);
194 
195  auto client = new uv_tcp_t();
196  uv_tcp_init(this->loop, client);
197 
198  // libuv does the 'nonzero is error' thing here
199  if (uv_accept(server, (uv_stream_t *)client)) {
200  uv_close((uv_handle_t *)client, UvCloseCallback);
201  return;
202  }
203 
204  auto id = this->NextConnectionID();
205  auto conn = std::make_shared<Connection>(*this, client, this->player, id);
206  client->data = static_cast<void *>(conn.get());
207  this->pool[id - 1] = std::move(conn);
208 
209  // Begin initial responses
211  .AddArg(std::to_string(id))
212  .AddArg(MSG_OHAI_BIFROST)
213  .AddArg(MSG_OHAI_PLAYD));
215  .AddArg("player/file"));
216  this->player.Dump(id, Response::NOREQUEST);
218  // End initial responses
219 
220  uv_read_start((uv_stream_t *)client, UvAlloc, UvReadCallback);
221 }
222 
224 {
225  // We'll want to try and use an existing, empty ID in the connection
226  // pool. If there aren't any (we've exceeded the maximum-so-far number
227  // of simultaneous connections), we expand the pool.
228  if (this->free_list.empty()) this->ExpandPool();
229  assert(!this->free_list.empty());
230 
231  // Acquire some free ID, and ensure that the ID cannot be re-used until
232  // replaced onto the free list by the connection's removal.
233  size_t id = this->free_list.back();
234  this->free_list.pop_back();
235 
236  // client_slot should be at least 1, because of the above.
237  assert(0 < id);
238  assert(id <= this->pool.size());
239 
240  return id;
241 }
242 
244 {
245  // If we already have SIZE_MAX-1 simultaneous connections, we bail out.
246  // Since this is at least 65,534, and likely to be 2^32-2 or 2^64-2,
247  // this is incredibly unlikely to happen and probably means someone's
248  // trying to denial-of-service an audio player.
249  //
250  // Why -1? Because slot 0 in the connection pool is reserved for
251  // broadcasts.
252  bool full = this->pool.size() == (SIZE_MAX - 1);
253  if (full) throw InternalError(MSG_TOO_MANY_CONNS);
254 
255  this->pool.emplace_back(nullptr);
256  // This isn't an off-by-one error; slots index from 1.
257  this->free_list.push_back(this->pool.size());
258 }
259 
260 void IoCore::Remove(size_t slot)
261 {
262  assert(0 < slot && slot <= this->pool.size());
263 
264  // Don't remove if it's already a nullptr, because we'd end up with the
265  // slot on the free list twice.
266  if (this->pool.at(slot - 1)) {
267  this->pool[slot - 1] = nullptr;
268  this->free_list.push_back(slot);
269  }
270 
271  assert(!this->pool.at(slot - 1));
272 }
273 
275 {
276  bool running = this->player.Update();
277  if (!running) this->Shutdown();
278 }
279 
281 {
282  Debug() << "Shutting down..." << std::endl;
283 
284  // If the player is ready to terminate, we need to kill the event loop
285  // in order to disconnect clients and stop the updating.
286  // We do this by stopping everything using the loop.
287 
288  // First, the update timer:
289  uv_timer_stop(&this->updater);
290 
291  // Then, the TCP server (as far as we can tell, this does *not* close
292  // down the connections):
293  uv_close(reinterpret_cast<uv_handle_t *>(&this->server), nullptr);
294 
295  // Next, ask each connection to stop.
296  for (const auto conn : this->pool) {
297  if (conn) conn->Shutdown();
298  }
299 
300  // Finally, unregister signal processing.
301  uv_signal_stop(&this->sigint);
302  uv_close(reinterpret_cast<uv_handle_t *>(&this->sigint), nullptr);
303 }
304 
305 void IoCore::Respond(size_t id, const Response &response) const
306 {
307  if (this->pool.empty()) return;
308 
309  if (id == 0) {
310  this->Broadcast(response);
311  } else {
312  this->Unicast(id, response);
313  }
314 }
315 
316 void IoCore::Broadcast(const Response &response) const
317 {
318  Debug() << "broadcast:" << response.Pack() << std::endl;
319 
320  // Copy the connection by value, so that there's at least one
321  // active reference to it throughout.
322  for (const auto c : this->pool) {
323  if (c) c->Respond(response);
324  }
325 }
326 
327 void IoCore::Unicast(size_t id, const Response &response) const
328 {
329  assert(0 < id && id <= this->pool.size());
330 
331  Debug() << "unicast @" << std::to_string(id) << ":" << response.Pack()
332  << std::endl;
333 
334  auto c = this->pool.at(id - 1);
335  if (c) c->Respond(response);
336 }
337 
339 {
340  assert(this->loop != nullptr);
341 
342  uv_timer_init(this->loop, &this->updater);
343  this->updater.data = static_cast<void *>(this);
344 
345  uv_timer_start(&this->updater, UvUpdateTimerCallback, 0,
347 }
348 
349 void IoCore::InitAcceptor(const std::string &address, const std::string &port)
350 {
351  assert(this->loop != nullptr);
352 
353  if (uv_tcp_init(this->loop, &this->server)) {
355  }
356  this->server.data = static_cast<void *>(this);
357  assert(this->server.data != nullptr);
358 
359  struct sockaddr_in bind_addr;
360  uv_ip4_addr(address.c_str(), std::stoi(port), &bind_addr);
361  uv_tcp_bind(&this->server, (const sockaddr *)&bind_addr, 0);
362 
363  int r = uv_listen((uv_stream_t *)&this->server, 128, UvListenCallback);
364  if (r) {
365  throw NetError("Could not listen on " + address + ":" + port +
366  " (" + std::string(uv_err_name(r)) + ")");
367  }
368 
369  Debug() << "Listening at" << address << "on" << port << std::endl;
370 }
371 
373 {
374  int r = uv_signal_init(this->loop, &this->sigint);
375  if (r) {
376  throw InternalError(MSG_IO_CANNOT_ALLOC + ": " +
377  std::string(uv_err_name(r)));
378  }
379 
380  // We pass the player, not the IoCore.
381  // This is so the SIGINT handler can tell the player to quit,
382  // which then tells us to shutdown
383  this->sigint.data = static_cast<void *>(&this->player);
384  assert(this->sigint.data != nullptr);
385  uv_signal_start(&this->sigint, UvSigintCallback, SIGINT);
386 }
387 
388 //
389 // Connection
390 //
391 
392 Connection::Connection(IoCore &parent, uv_tcp_t *tcp, Player &player, size_t id)
393  : parent(parent), tcp(tcp), tokeniser(), player(player), id(id)
394 {
395  Debug() << "Opening connection from" << Name() << std::endl;
396 }
397 
399 {
400  Debug() << "Closing connection from" << Name() << std::endl;
401  uv_close(reinterpret_cast<uv_handle_t *>(this->tcp), UvCloseCallback);
402 }
403 
404 void Connection::Respond(const Response &response)
405 {
406  // Pack provides us the response's wire format, except the newline.
407  // We can provide that here.
408  auto string = response.Pack();
409  string.push_back('\n');
410 
411  unsigned int l = string.length();
412 
413  // Make a libuv buffer and pour the request into it.
414  // The onus is on UvRespondCallback to free buf.base.
415  auto buf = uv_buf_init(new char[l], l);
416  assert(buf.base != nullptr);
417  string.copy(buf.base, l);
418 
419  // Make a write request.
420  // Since the callback must free the buffer, pass it through as data.
421  // The callback will also free req.
422  auto req = new uv_write_t;
423  req->data = static_cast<void *>(buf.base);
424 
425  uv_write((uv_write_t *)req, (uv_stream_t *)this->tcp, &buf, 1,
427 }
428 
429 std::string Connection::Name()
430 {
431  // Warning: fairly low-level Berkeley sockets code ahead!
432  // (Thankfully, libuv makes sure the appropriate headers are included.)
433 
434  // Using this instead of struct sockaddr is advised by the libuv docs,
435  // for IPv6 compatibility.
436  struct sockaddr_storage s;
437  auto sp = (struct sockaddr *)&s;
438 
439  // Turns out if you don't do this, Windows (and only Windows?) is upset.
440  socklen_t namelen = sizeof(s);
441 
442  int pe = uv_tcp_getpeername(this->tcp, sp, (int *)&namelen);
443  // These std::string()s are needed as, otherwise, the compiler would
444  // think we're trying to add const char*s together. We need AT LEAST
445  // ONE of the sides of the first + to be a std::string.
446  if (pe) return "<error@peer: " + std::string(uv_strerror(pe)) + ">";
447 
448  // Now, split the sockaddr into host and service.
449  char host[NI_MAXHOST];
450  char serv[NI_MAXSERV];
451 
452  // We use NI_NUMERICSERV to ensure a port number comes out.
453  // Otherwise, we could get a (likely erroneous) string description of
454  // what the network stack *thinks* the port is used for.
455  int ne = getnameinfo(sp, namelen, host, sizeof(host), serv,
456  sizeof(serv), NI_NUMERICSERV);
457  // See comment for above error.
458  if (ne) return "<error@name: " + std::string(gai_strerror(ne)) + ">";
459 
460  auto id = std::to_string(this->id);
461  return id + std::string("!") + host + std::string(":") + serv;
462 }
463 
464 void Connection::Read(ssize_t nread, const uv_buf_t *buf)
465 {
466  assert(buf != nullptr);
467 
468  // Did the connection hang up? If so, de-pool it.
469  // De-pooling the connection will usually lead to the connection being
470  // destroyed.
471  if (nread == UV_EOF) {
472  this->Depool();
473  return;
474  }
475 
476  // Did we hit any other read errors? Also de-pool, but log the error.
477  if (nread < 0) {
478  Debug() << "Error on" << Name() << "-" << uv_err_name(nread)
479  << std::endl;
480  this->Depool();
481  return;
482  }
483 
484  // Make sure we actually have some data to read!
485  if (buf->base == nullptr) return;
486 
487  // Everything looks okay for reading.
488  auto cmds = this->tokeniser.Feed(std::string(buf->base, nread));
489  for (auto cmd : cmds) {
490  if (cmd.empty()) continue;
491 
492  Response res = RunCommand(cmd);
493  this->Respond(res);
494  }
495 
496  delete[] buf->base;
497 }
498 
499 Response Connection::RunCommand(const std::vector<std::string> &cmd)
500 {
501  // First of all, figure out what the tag of this command is.
502  // The first word is always the tag.
503  auto tag = cmd[0];
504  if (cmd.size() <= 1) return Response::Invalid(tag, MSG_CMD_SHORT);
505 
506  // The next words are the actual command, and any other arguments.
507  auto word = cmd[1];
508  auto nargs = cmd.size() - 2;
509 
510  if (nargs == 0) {
511  if ("play" == word) return this->player.SetPlaying(tag, true);
512  if ("stop" == word) return this->player.SetPlaying(tag, false);
513  if ("end" == word) return this->player.End(tag);
514  if ("eject" == word) return this->player.Eject(tag);
515  if ("dump" == word) return this->player.Dump(id, tag);
516  } else if (nargs == 1) {
517  if ("fload" == word) return this->player.Load(tag, cmd[2]);
518  if ("pos" == word) return this->player.Pos(tag, cmd[2]);
519  }
520 
521  return Response::Invalid(tag, MSG_CMD_INVALID);
522 }
523 
525 {
526  auto req = new uv_shutdown_t;
527  assert(req != nullptr);
528 
529  req->data = this;
530 
531  uv_shutdown(req, reinterpret_cast<uv_stream_t *>(this->tcp),
533 }
534 
536 {
537  this->parent.Remove(this->id);
538 }
Declarations of the playd Error exception set.
void UvShutdownCallback(uv_shutdown_t *handle, int status)
The callback fired when a client is shut down.
Definition: io.cpp:146
const std::string MSG_IO_CANNOT_ALLOC
Message shown when allocating an IO object fails.
Definition: messages.h:118
const std::string MSG_CMD_SHORT
Message shown when the CommandHandler receives an under-length command.
Definition: messages.h:32
size_t NextConnectionID()
Acquires the next available connection ID.
Definition: io.cpp:223
bool Update()
Instructs the Player to perform a cycle of work.
Definition: player.cpp:41
A response.
Definition: response.hpp:23
void UvWriteCallback(uv_write_t *req, int status)
The callback fired when a response has been sent to a client.
Definition: io.cpp:92
void Read(ssize_t nread, const uv_buf_t *buf)
Processes a data read on this connection.
Definition: io.cpp:464
Player & player
The Player to which finished commands should be sent.
Definition: io.hpp:256
std::string Pack() const
Packs the Response, converting it to a BAPS3 protocol message.
Definition: response.cpp:44
void UvAlloc(uv_handle_t *, size_t suggested_size, uv_buf_t *buf)
The function used to allocate and initialise buffers for client reading.
Definition: io.cpp:51
Response Load(const std::string &tag, const std::string &path)
Loads a file.
Definition: player.cpp:122
void UvSigintCallback(uv_signal_t *handle, int signum)
The callback fired when SIGINT occurs.
Definition: io.cpp:129
void UpdatePlayer()
Performs a player update cycle.
Definition: io.cpp:274
Constant human-readable messages used within playd.
std::vector< size_t > free_list
A list of free 1-indexed slots inside pool.
Definition: io.hpp:118
void UvUpdateTimerCallback(uv_timer_t *handle)
The callback fired when the update timer fires.
Definition: io.cpp:115
Declaration of the I/O classes used in playd.
IoCore & parent
The pool on which this connection is running.
Definition: io.hpp:247
void Broadcast(const Response &response) const
Sends the given response to all connections.
Definition: io.cpp:316
void InitSignals()
Initialises playd&#39;s signal handling.
Definition: io.cpp:372
~Connection()
Destructs a Connection.
Definition: io.cpp:398
static const std::string NOREQUEST
The tag for unsolicited messages (not from responses).
Definition: response.hpp:27
static Response Success(const std::string &tag)
Shortcut for constructing a final response to a successful request.
Definition: response.cpp:49
uv_signal_t sigint
The libuv handle for the Ctrl-C signal.
Definition: io.hpp:107
Player & player
The player.
Definition: io.hpp:111
Response Eject(const std::string &tag)
Ejects the current loaded song, if any.
Definition: player.cpp:85
std::string Name()
Retrieves a name for this connection.
Definition: io.cpp:429
void Unicast(size_t id, const Response &response) const
Sends the given response to the identified connection.
Definition: io.cpp:327
void Respond(size_t id, const Response &response) const override
Outputs a response.
Definition: io.cpp:305
void Depool()
Removes this connection from its connection pool.
Definition: io.cpp:535
void Accept(uv_stream_t *server)
Accepts a new connection.
Definition: io.cpp:190
void UvReadCallback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
The callback fired when some bytes are read from a client connection.
Definition: io.cpp:65
void InitAcceptor(const std::string &address, const std::string &port)
Initialises a TCP acceptor on the given address and port.
Definition: io.cpp:349
void Run(const std::string &host, const std::string &port)
Runs the reactor.
Definition: io.cpp:174
void UvListenCallback(uv_stream_t *server, int status)
The callback fired when a new client connection is acquired by the listener.
Definition: io.cpp:80
uv_tcp_t server
The libuv handle for the TCP server.
Definition: io.hpp:108
const std::string MSG_CMD_INVALID
Message shown when the CommandHandler receives an invalid command.
Definition: messages.h:36
Response Pos(const std::string &tag, const std::string &pos_str)
Seeks to a given position in the current file.
Definition: player.cpp:155
std::vector< std::vector< std::string > > Feed(const std::string &raw)
Feeds a string into a Tokeniser.
Definition: tokeniser.cpp:24
void InitUpdateTimer()
Sets up a periodic timer to run the playd update loop.
Definition: io.cpp:338
void UvCloseCallback(uv_handle_t *handle)
The callback fired when a client connection closes.
Definition: io.cpp:58
const std::string MSG_TOO_MANY_CONNS
Message shown when too many simultaneous connections are launched.
Definition: messages.h:121
Declaration of the Player class, and associated types.
Response Dump(size_t id, const std::string &tag) const
Dumps the current player state to the given ID.
Definition: player.cpp:65
std::vector< std::shared_ptr< Connection > > pool
The set of connections inside this IoCore.
Definition: io.hpp:114
Response RunCommand(const std::vector< std::string > &msg)
Handles a tokenised command line.
Definition: io.cpp:499
void ExpandPool()
Adds a new connection slot to the connection pool.
Definition: io.cpp:243
const std::string MSG_OHAI_BIFROST
The protocol name and version.
Definition: messages.h:25
Declaration of classes pertaining to responses to the client.
Response SetPlaying(const std::string &tag, bool playing)
Tells the audio file to start or stop playing.
Definition: player.cpp:188
A network error.
Definition: errors.hpp:105
Server sending its role.
Class for telling the human what playd is doing.
Definition: errors.hpp:133
An Error signifying that playd has hit an internal snag.
Definition: errors.hpp:60
Connection(IoCore &parent, uv_tcp_t *tcp, Player &player, size_t id)
Constructs a Connection.
Definition: io.cpp:392
Response End(const std::string &tag)
Ends a file, stopping and rewinding.
Definition: player.cpp:104
uv_tcp_t * tcp
The libuv handle for the TCP connection.
Definition: io.hpp:250
void Shutdown()
Shuts down the IoCore by terminating all IO loop tasks.
Definition: io.cpp:280
IoCore(Player &player)
Constructs an IoCore.
Definition: io.cpp:170
void Shutdown()
Gracefully shuts this connection down.
Definition: io.cpp:524
Tokeniser tokeniser
The Tokeniser to which data read on this connection should be sent.
Definition: io.hpp:253
A TCP connection from a client.
Definition: io.hpp:186
uv_loop_t * loop
The loop this IoCore is using.
Definition: io.hpp:106
A Player contains a loaded audio file and a command API for manipulating it.
Definition: player.hpp:31
Server starting up.
void Respond(const Response &response)
Emits a Response via this Connection.
Definition: io.cpp:404
void Remove(size_t id)
Removes a connection.
Definition: io.cpp:260
const std::string MSG_OHAI_PLAYD
The playd name and version.
Definition: messages.h:22
The IO core, which services input, routes responses, and executes the Player update routine periodica...
Definition: io.hpp:39
static const uint16_t PLAYER_UPDATE_PERIOD
The period between player updates.
Definition: io.hpp:104
static Response Invalid(const std::string &tag, const std::string &msg)
Shortcut for constructing a final response to a invalid request.
Definition: response.cpp:56
uv_timer_t updater
The libuv handle for the update timer.
Definition: io.hpp:109