udp.c
Go to the documentation of this file.
1 /*
2  * UDP prototype streaming system
3  * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 /**
23  * @file
24  * UDP protocol
25  */
26 
27 #define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */
28 
29 #include "avformat.h"
30 #include "avio_internal.h"
31 #include "libavutil/parseutils.h"
32 #include "libavutil/fifo.h"
33 #include "libavutil/intreadwrite.h"
34 #include "libavutil/avstring.h"
35 #include "libavutil/opt.h"
36 #include "libavutil/log.h"
37 #include "libavutil/time.h"
38 #include "internal.h"
39 #include "network.h"
40 #include "os_support.h"
41 #include "url.h"
42 
43 #if HAVE_PTHREAD_CANCEL
44 #include <pthread.h>
45 #endif
46 
47 #ifndef HAVE_PTHREAD_CANCEL
48 #define HAVE_PTHREAD_CANCEL 0
49 #endif
50 
51 #ifndef IPV6_ADD_MEMBERSHIP
52 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
53 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
54 #endif
55 
56 #define UDP_TX_BUF_SIZE 32768
57 #define UDP_MAX_PKT_SIZE 65536
58 
59 typedef struct {
60  const AVClass *class;
61  int udp_fd;
62  int ttl;
68  struct sockaddr_storage dest_addr;
71 
72  /* Circular Buffer variables for use in UDP receive code */
76 #if HAVE_PTHREAD_CANCEL
77  pthread_t circular_buffer_thread;
79  pthread_cond_t cond;
80  int thread_started;
81 #endif
84  char *local_addr;
86  int timeout;
87 } UDPContext;
88 
89 #define OFFSET(x) offsetof(UDPContext, x)
90 #define D AV_OPT_FLAG_DECODING_PARAM
91 #define E AV_OPT_FLAG_ENCODING_PARAM
92 static const AVOption options[] = {
93 {"buffer_size", "Socket buffer size in bytes", OFFSET(buffer_size), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
94 {"localport", "Set local port to bind to", OFFSET(local_port), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
95 {"localaddr", "Choose local IP address", OFFSET(local_addr), AV_OPT_TYPE_STRING, {.str = ""}, 0, 0, D|E },
96 {"pkt_size", "Set size of UDP packets", OFFSET(packet_size), AV_OPT_TYPE_INT, {.i64 = 1472}, 0, INT_MAX, D|E },
97 {"reuse", "Explicitly allow or disallow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E },
98 {"ttl", "Set the time to live value (for multicast only)", OFFSET(ttl), AV_OPT_TYPE_INT, {.i64 = 16}, 0, INT_MAX, E },
99 {"connect", "Should connect() be called on socket", OFFSET(is_connected), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E },
100 /* TODO 'sources', 'block' option */
101 {"fifo_size", "Set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
102 {"overrun_nonfatal", "Survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D },
103 {"timeout", "In read mode: if no data arrived in more than this time interval, raise error", OFFSET(timeout), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D },
104 {NULL}
105 };
106 
107 static const AVClass udp_context_class = {
108  .class_name = "udp",
109  .item_name = av_default_item_name,
110  .option = options,
111  .version = LIBAVUTIL_VERSION_INT,
112 };
113 
114 static void log_net_error(void *ctx, int level, const char* prefix)
115 {
116  char errbuf[100];
117  av_strerror(ff_neterrno(), errbuf, sizeof(errbuf));
118  av_log(ctx, level, "%s: %s\n", prefix, errbuf);
119 }
120 
121 static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
122  struct sockaddr *addr)
123 {
124 #ifdef IP_MULTICAST_TTL
125  if (addr->sa_family == AF_INET) {
126  if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
127  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL)");
128  return -1;
129  }
130  }
131 #endif
132 #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
133  if (addr->sa_family == AF_INET6) {
134  if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
135  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS)");
136  return -1;
137  }
138  }
139 #endif
140  return 0;
141 }
142 
143 static int udp_join_multicast_group(int sockfd, struct sockaddr *addr)
144 {
145 #ifdef IP_ADD_MEMBERSHIP
146  if (addr->sa_family == AF_INET) {
147  struct ip_mreq mreq;
148 
149  mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
150  mreq.imr_interface.s_addr= INADDR_ANY;
151  if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
152  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
153  return -1;
154  }
155  }
156 #endif
157 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
158  if (addr->sa_family == AF_INET6) {
159  struct ipv6_mreq mreq6;
160 
161  memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
162  mreq6.ipv6mr_interface= 0;
163  if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
164  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
165  return -1;
166  }
167  }
168 #endif
169  return 0;
170 }
171 
172 static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr)
173 {
174 #ifdef IP_DROP_MEMBERSHIP
175  if (addr->sa_family == AF_INET) {
176  struct ip_mreq mreq;
177 
178  mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
179  mreq.imr_interface.s_addr= INADDR_ANY;
180  if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
181  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
182  return -1;
183  }
184  }
185 #endif
186 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
187  if (addr->sa_family == AF_INET6) {
188  struct ipv6_mreq mreq6;
189 
190  memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
191  mreq6.ipv6mr_interface= 0;
192  if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
193  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
194  return -1;
195  }
196  }
197 #endif
198  return 0;
199 }
200 
201 static struct addrinfo* udp_resolve_host(const char *hostname, int port,
202  int type, int family, int flags)
203 {
204  struct addrinfo hints = { 0 }, *res = 0;
205  int error;
206  char sport[16];
207  const char *node = 0, *service = "0";
208 
209  if (port > 0) {
210  snprintf(sport, sizeof(sport), "%d", port);
211  service = sport;
212  }
213  if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
214  node = hostname;
215  }
216  hints.ai_socktype = type;
217  hints.ai_family = family;
218  hints.ai_flags = flags;
219  if ((error = getaddrinfo(node, service, &hints, &res))) {
220  res = NULL;
221  av_log(NULL, AV_LOG_ERROR, "udp_resolve_host: %s\n", gai_strerror(error));
222  }
223 
224  return res;
225 }
226 
227 static int udp_set_multicast_sources(int sockfd, struct sockaddr *addr,
228  int addr_len, char **sources,
229  int nb_sources, int include)
230 {
231 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) && !defined(_WIN32)
232  /* These ones are available in the microsoft SDK, but don't seem to work
233  * as on linux, so just prefer the v4-only approach there for now. */
234  int i;
235  for (i = 0; i < nb_sources; i++) {
236  struct group_source_req mreqs;
237  int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
238  struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
239  SOCK_DGRAM, AF_UNSPEC,
241  if (!sourceaddr)
242  return AVERROR(ENOENT);
243 
244  mreqs.gsr_interface = 0;
245  memcpy(&mreqs.gsr_group, addr, addr_len);
246  memcpy(&mreqs.gsr_source, sourceaddr->ai_addr, sourceaddr->ai_addrlen);
247  freeaddrinfo(sourceaddr);
248 
249  if (setsockopt(sockfd, level,
250  include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
251  (const void *)&mreqs, sizeof(mreqs)) < 0) {
252  if (include)
253  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
254  else
255  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
256  return ff_neterrno();
257  }
258  }
259 #elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
260  int i;
261  if (addr->sa_family != AF_INET) {
263  "Setting multicast sources only supported for IPv4\n");
264  return AVERROR(EINVAL);
265  }
266  for (i = 0; i < nb_sources; i++) {
267  struct ip_mreq_source mreqs;
268  struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
269  SOCK_DGRAM, AF_UNSPEC,
271  if (!sourceaddr)
272  return AVERROR(ENOENT);
273  if (sourceaddr->ai_addr->sa_family != AF_INET) {
274  freeaddrinfo(sourceaddr);
275  av_log(NULL, AV_LOG_ERROR, "%s is of incorrect protocol family\n",
276  sources[i]);
277  return AVERROR(EINVAL);
278  }
279 
280  mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
281  mreqs.imr_interface.s_addr = INADDR_ANY;
282  mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)sourceaddr->ai_addr)->sin_addr.s_addr;
283  freeaddrinfo(sourceaddr);
284 
285  if (setsockopt(sockfd, IPPROTO_IP,
286  include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
287  (const void *)&mreqs, sizeof(mreqs)) < 0) {
288  if (include)
289  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
290  else
291  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
292  return ff_neterrno();
293  }
294  }
295 #else
296  return AVERROR(ENOSYS);
297 #endif
298  return 0;
299 }
300 static int udp_set_url(struct sockaddr_storage *addr,
301  const char *hostname, int port)
302 {
303  struct addrinfo *res0;
304  int addr_len;
305 
306  res0 = udp_resolve_host(hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
307  if (res0 == 0) return AVERROR(EIO);
308  memcpy(addr, res0->ai_addr, res0->ai_addrlen);
309  addr_len = res0->ai_addrlen;
310  freeaddrinfo(res0);
311 
312  return addr_len;
313 }
314 
315 static int udp_socket_create(UDPContext *s, struct sockaddr_storage *addr,
316  socklen_t *addr_len, const char *localaddr)
317 {
318  int udp_fd = -1;
319  struct addrinfo *res0 = NULL, *res = NULL;
320  int family = AF_UNSPEC;
321 
322  if (((struct sockaddr *) &s->dest_addr)->sa_family)
323  family = ((struct sockaddr *) &s->dest_addr)->sa_family;
324  res0 = udp_resolve_host(localaddr[0] ? localaddr : NULL, s->local_port,
325  SOCK_DGRAM, family, AI_PASSIVE);
326  if (res0 == 0)
327  goto fail;
328  for (res = res0; res; res=res->ai_next) {
329  udp_fd = socket(res->ai_family, SOCK_DGRAM, 0);
330  if (udp_fd != -1) break;
331  log_net_error(NULL, AV_LOG_ERROR, "socket");
332  }
333 
334  if (udp_fd < 0)
335  goto fail;
336 
337  memcpy(addr, res->ai_addr, res->ai_addrlen);
338  *addr_len = res->ai_addrlen;
339 
340  freeaddrinfo(res0);
341 
342  return udp_fd;
343 
344  fail:
345  if (udp_fd >= 0)
346  closesocket(udp_fd);
347  if(res0)
348  freeaddrinfo(res0);
349  return -1;
350 }
351 
352 static int udp_port(struct sockaddr_storage *addr, int addr_len)
353 {
354  char sbuf[sizeof(int)*3+1];
355  int error;
356 
357  if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0, sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
358  av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
359  return -1;
360  }
361 
362  return strtol(sbuf, NULL, 10);
363 }
364 
365 
366 /**
367  * If no filename is given to av_open_input_file because you want to
368  * get the local port first, then you must call this function to set
369  * the remote server address.
370  *
371  * url syntax: udp://host:port[?option=val...]
372  * option: 'ttl=n' : set the ttl value (for multicast only)
373  * 'localport=n' : set the local port
374  * 'pkt_size=n' : set max packet size
375  * 'reuse=1' : enable reusing the socket
376  * 'overrun_nonfatal=1': survive in case of circular buffer overrun
377  *
378  * @param h media file context
379  * @param uri of the remote server
380  * @return zero if no error.
381  */
382 int ff_udp_set_remote_url(URLContext *h, const char *uri)
383 {
384  UDPContext *s = h->priv_data;
385  char hostname[256], buf[10];
386  int port;
387  const char *p;
388 
389  av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
390 
391  /* set the destination address */
392  s->dest_addr_len = udp_set_url(&s->dest_addr, hostname, port);
393  if (s->dest_addr_len < 0) {
394  return AVERROR(EIO);
395  }
396  s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
397  p = strchr(uri, '?');
398  if (p) {
399  if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
400  int was_connected = s->is_connected;
401  s->is_connected = strtol(buf, NULL, 10);
402  if (s->is_connected && !was_connected) {
403  if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
404  s->dest_addr_len)) {
405  s->is_connected = 0;
406  log_net_error(h, AV_LOG_ERROR, "connect");
407  return AVERROR(EIO);
408  }
409  }
410  }
411  }
412 
413  return 0;
414 }
415 
416 /**
417  * Return the local port used by the UDP connection
418  * @param h media file context
419  * @return the local port number
420  */
422 {
423  UDPContext *s = h->priv_data;
424  return s->local_port;
425 }
426 
427 /**
428  * Return the udp file handle for select() usage to wait for several RTP
429  * streams at the same time.
430  * @param h media file context
431  */
433 {
434  UDPContext *s = h->priv_data;
435  return s->udp_fd;
436 }
437 
438 #if HAVE_PTHREAD_CANCEL
439 static void *circular_buffer_task( void *_URLContext)
440 {
441  URLContext *h = _URLContext;
442  UDPContext *s = h->priv_data;
443  int old_cancelstate;
444 
445  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
446  pthread_mutex_lock(&s->mutex);
447  if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
448  av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
449  s->circular_buffer_error = AVERROR(EIO);
450  goto end;
451  }
452  while(1) {
453  int len;
454 
455  pthread_mutex_unlock(&s->mutex);
456  /* Blocking operations are always cancellation points;
457  see "General Information" / "Thread Cancelation Overview"
458  in Single Unix. */
459  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
460  len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
461  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
462  pthread_mutex_lock(&s->mutex);
463  if (len < 0) {
464  if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
466  goto end;
467  }
468  continue;
469  }
470  AV_WL32(s->tmp, len);
471 
472  if(av_fifo_space(s->fifo) < len + 4) {
473  /* No Space left */
474  if (s->overrun_nonfatal) {
475  av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
476  "Surviving due to overrun_nonfatal option\n");
477  continue;
478  } else {
479  av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
480  "To avoid, increase fifo_size URL option. "
481  "To survive in such case, use overrun_nonfatal option\n");
482  s->circular_buffer_error = AVERROR(EIO);
483  goto end;
484  }
485  }
486  av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
487  pthread_cond_signal(&s->cond);
488  }
489 
490 end:
491  pthread_cond_signal(&s->cond);
492  pthread_mutex_unlock(&s->mutex);
493  return NULL;
494 }
495 #endif
496 
497 /* put it in UDP context */
498 /* return non zero if error */
499 static int udp_open(URLContext *h, const char *uri, int flags)
500 {
501  char hostname[1024], localaddr[1024] = "";
502  int port, udp_fd = -1, tmp, bind_ret = -1;
503  UDPContext *s = h->priv_data;
504  int is_output;
505  const char *p;
506  char buf[256];
507  struct sockaddr_storage my_addr;
508  socklen_t len;
509  int reuse_specified = 0;
510  int i, include = 0, num_sources = 0;
511  char *sources[32];
512 
513  h->is_streamed = 1;
514 
515  is_output = !(flags & AVIO_FLAG_READ);
516  if (!s->buffer_size) /* if not set explicitly */
517  s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
518 
519  p = strchr(uri, '?');
520  if (p) {
521  if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
522  char *endptr = NULL;
523  s->reuse_socket = strtol(buf, &endptr, 10);
524  /* assume if no digits were found it is a request to enable it */
525  if (buf == endptr)
526  s->reuse_socket = 1;
527  reuse_specified = 1;
528  }
529  if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
530  char *endptr = NULL;
531  s->overrun_nonfatal = strtol(buf, &endptr, 10);
532  /* assume if no digits were found it is a request to enable it */
533  if (buf == endptr)
534  s->overrun_nonfatal = 1;
535  if (!HAVE_PTHREAD_CANCEL)
537  "'overrun_nonfatal' option was set but it is not supported "
538  "on this build (pthread support is required)\n");
539  }
540  if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
541  s->ttl = strtol(buf, NULL, 10);
542  }
543  if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
544  s->local_port = strtol(buf, NULL, 10);
545  }
546  if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
547  s->packet_size = strtol(buf, NULL, 10);
548  }
549  if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
550  s->buffer_size = strtol(buf, NULL, 10);
551  }
552  if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
553  s->is_connected = strtol(buf, NULL, 10);
554  }
555  if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
556  s->circular_buffer_size = strtol(buf, NULL, 10);
557  if (!HAVE_PTHREAD_CANCEL)
559  "'circular_buffer_size' option was set but it is not supported "
560  "on this build (pthread support is required)\n");
561  }
562  if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
563  av_strlcpy(localaddr, buf, sizeof(localaddr));
564  }
565  if (av_find_info_tag(buf, sizeof(buf), "sources", p))
566  include = 1;
567  if (include || av_find_info_tag(buf, sizeof(buf), "block", p)) {
568  char *source_start;
569 
570  source_start = buf;
571  while (1) {
572  char *next = strchr(source_start, ',');
573  if (next)
574  *next = '\0';
575  sources[num_sources] = av_strdup(source_start);
576  if (!sources[num_sources])
577  goto fail;
578  source_start = next + 1;
579  num_sources++;
580  if (num_sources >= FF_ARRAY_ELEMS(sources) || !next)
581  break;
582  }
583  }
584  if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
585  s->timeout = strtol(buf, NULL, 10);
586  }
587  /* handling needed to support options picking from both AVOption and URL */
588  s->circular_buffer_size *= 188;
590  h->rw_timeout = s->timeout;
591 
592  /* fill the dest addr */
593  av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
594 
595  /* XXX: fix av_url_split */
596  if (hostname[0] == '\0' || hostname[0] == '?') {
597  /* only accepts null hostname if input */
598  if (!(flags & AVIO_FLAG_READ))
599  goto fail;
600  } else {
601  if (ff_udp_set_remote_url(h, uri) < 0)
602  goto fail;
603  }
604 
605  if ((s->is_multicast || !s->local_port) && (h->flags & AVIO_FLAG_READ))
606  s->local_port = port;
607  udp_fd = udp_socket_create(s, &my_addr, &len, localaddr[0] ? localaddr : s->local_addr);
608  if (udp_fd < 0)
609  goto fail;
610 
611  /* Follow the requested reuse option, unless it's multicast in which
612  * case enable reuse unless explicitly disabled.
613  */
614  if (s->reuse_socket || (s->is_multicast && !reuse_specified)) {
615  s->reuse_socket = 1;
616  if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0)
617  goto fail;
618  }
619 
620  /* If multicast, try binding the multicast address first, to avoid
621  * receiving UDP packets from other sources aimed at the same UDP
622  * port. This fails on windows. This makes sending to the same address
623  * using sendto() fail, so only do it if we're opened in read-only mode. */
624  if (s->is_multicast && !(h->flags & AVIO_FLAG_WRITE)) {
625  bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
626  }
627  /* bind to the local address if not multicast or if the multicast
628  * bind failed */
629  /* the bind is needed to give a port to the socket now */
630  if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
631  log_net_error(h, AV_LOG_ERROR, "bind failed");
632  goto fail;
633  }
634 
635  len = sizeof(my_addr);
636  getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
637  s->local_port = udp_port(&my_addr, len);
638 
639  if (s->is_multicast) {
640  if (h->flags & AVIO_FLAG_WRITE) {
641  /* output */
642  if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0)
643  goto fail;
644  }
645  if (h->flags & AVIO_FLAG_READ) {
646  /* input */
647  if (num_sources == 0 || !include) {
648  if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr) < 0)
649  goto fail;
650 
651  if (num_sources) {
652  if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 0) < 0)
653  goto fail;
654  }
655  } else if (include && num_sources) {
656  if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 1) < 0)
657  goto fail;
658  } else {
659  av_log(NULL, AV_LOG_ERROR, "invalid udp settings: inclusive multicast but no sources given\n");
660  goto fail;
661  }
662  }
663  }
664 
665  if (is_output) {
666  /* limit the tx buf size to limit latency */
667  tmp = s->buffer_size;
668  if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
669  log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
670  goto fail;
671  }
672  } else {
673  /* set udp recv buffer size to the largest possible udp packet size to
674  * avoid losing data on OSes that set this too low by default. */
675  tmp = s->buffer_size;
676  if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
677  log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
678  }
679  /* make the socket non-blocking */
680  ff_socket_nonblock(udp_fd, 1);
681  }
682  if (s->is_connected) {
683  if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
684  log_net_error(h, AV_LOG_ERROR, "connect");
685  goto fail;
686  }
687  }
688 
689  for (i = 0; i < num_sources; i++)
690  av_freep(&sources[i]);
691 
692  s->udp_fd = udp_fd;
693 
694 #if HAVE_PTHREAD_CANCEL
695  if (!is_output && s->circular_buffer_size) {
696  int ret;
697 
698  /* start the task going */
700  ret = pthread_mutex_init(&s->mutex, NULL);
701  if (ret != 0) {
702  av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
703  goto fail;
704  }
705  ret = pthread_cond_init(&s->cond, NULL);
706  if (ret != 0) {
707  av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
708  goto cond_fail;
709  }
710  ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
711  if (ret != 0) {
712  av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
713  goto thread_fail;
714  }
715  s->thread_started = 1;
716  }
717 #endif
718 
719  return 0;
720 #if HAVE_PTHREAD_CANCEL
721  thread_fail:
722  pthread_cond_destroy(&s->cond);
723  cond_fail:
724  pthread_mutex_destroy(&s->mutex);
725 #endif
726  fail:
727  if (udp_fd >= 0)
728  closesocket(udp_fd);
729  av_fifo_free(s->fifo);
730  for (i = 0; i < num_sources; i++)
731  av_freep(&sources[i]);
732  return AVERROR(EIO);
733 }
734 
735 static int udp_read(URLContext *h, uint8_t *buf, int size)
736 {
737  UDPContext *s = h->priv_data;
738  int ret;
739  int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
740 
741 #if HAVE_PTHREAD_CANCEL
742  if (s->fifo) {
743  pthread_mutex_lock(&s->mutex);
744  do {
745  avail = av_fifo_size(s->fifo);
746  if (avail) { // >=size) {
747  uint8_t tmp[4];
748 
749  av_fifo_generic_read(s->fifo, tmp, 4, NULL);
750  avail= AV_RL32(tmp);
751  if(avail > size){
752  av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
753  avail= size;
754  }
755 
756  av_fifo_generic_read(s->fifo, buf, avail, NULL);
757  av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
758  pthread_mutex_unlock(&s->mutex);
759  return avail;
760  } else if(s->circular_buffer_error){
761  int err = s->circular_buffer_error;
762  pthread_mutex_unlock(&s->mutex);
763  return err;
764  } else if(nonblock) {
765  pthread_mutex_unlock(&s->mutex);
766  return AVERROR(EAGAIN);
767  }
768  else {
769  /* FIXME: using the monotonic clock would be better,
770  but it does not exist on all supported platforms. */
771  int64_t t = av_gettime() + 100000;
772  struct timespec tv = { .tv_sec = t / 1000000,
773  .tv_nsec = (t % 1000000) * 1000 };
774  if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
775  pthread_mutex_unlock(&s->mutex);
776  return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
777  }
778  nonblock = 1;
779  }
780  } while( 1);
781  }
782 #endif
783 
784  if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
785  ret = ff_network_wait_fd(s->udp_fd, 0);
786  if (ret < 0)
787  return ret;
788  }
789  ret = recv(s->udp_fd, buf, size, 0);
790 
791  return ret < 0 ? ff_neterrno() : ret;
792 }
793 
794 static int udp_write(URLContext *h, const uint8_t *buf, int size)
795 {
796  UDPContext *s = h->priv_data;
797  int ret;
798 
799  if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
800  ret = ff_network_wait_fd(s->udp_fd, 1);
801  if (ret < 0)
802  return ret;
803  }
804 
805  if (!s->is_connected) {
806  ret = sendto (s->udp_fd, buf, size, 0,
807  (struct sockaddr *) &s->dest_addr,
808  s->dest_addr_len);
809  } else
810  ret = send(s->udp_fd, buf, size, 0);
811 
812  return ret < 0 ? ff_neterrno() : ret;
813 }
814 
815 static int udp_close(URLContext *h)
816 {
817  UDPContext *s = h->priv_data;
818  int ret;
819 
820  if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
821  udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
822  closesocket(s->udp_fd);
823 #if HAVE_PTHREAD_CANCEL
824  if (s->thread_started) {
825  pthread_cancel(s->circular_buffer_thread);
826  ret = pthread_join(s->circular_buffer_thread, NULL);
827  if (ret != 0)
828  av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
829  pthread_mutex_destroy(&s->mutex);
830  pthread_cond_destroy(&s->cond);
831  }
832 #endif
833  av_fifo_free(s->fifo);
834  return 0;
835 }
836 
838  .name = "udp",
839  .url_open = udp_open,
840  .url_read = udp_read,
841  .url_write = udp_write,
842  .url_close = udp_close,
843  .url_get_file_handle = udp_get_file_handle,
844  .priv_data_size = sizeof(UDPContext),
845  .priv_data_class = &udp_context_class,
847 };
static int udp_open(URLContext *h, const char *uri, int flags)
Definition: udp.c:499
void av_url_split(char *proto, int proto_size, char *authorization, int authorization_size, char *hostname, int hostname_size, int *port_ptr, char *path, int path_size, const char *url)
Split a URL string into components.
const char * s
Definition: avisynth_c.h:668
static void log_net_error(void *ctx, int level, const char *prefix)
Definition: udp.c:114
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:90
#define URL_PROTOCOL_FLAG_NETWORK
Definition: url.h:35
#define NI_NUMERICSERV
Definition: network.h:180
#define HAVE_PTHREAD_CANCEL
Definition: udp.c:48
AVOption.
Definition: opt.h:251
av_default_item_name
static int udp_set_multicast_sources(int sockfd, struct sockaddr *addr, int addr_len, char **sources, int nb_sources, int include)
Definition: udp.c:227
int is_streamed
true if streamed (no seek possible), default = false
Definition: url.h:48
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:154
#define AVIO_FLAG_READ
read-only
Definition: avio.h:332
int64_t rw_timeout
maximum time to wait for (network) read/write operation completion, in mcs
Definition: url.h:51
#define AVIO_FLAG_WRITE
write-only
Definition: avio.h:333
#define AI_PASSIVE
Definition: network.h:156
int flags
Definition: url.h:46
#define FF_ARRAY_ELEMS(a)
#define freeaddrinfo
Definition: network.h:195
#define AI_NUMERICHOST
Definition: network.h:164
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:120
char * local_addr
Definition: udp.c:84
int av_fifo_generic_write(AVFifoBuffer *f, void *src, int size, int(*func)(void *, void *, int))
Feed data from a user-supplied callback to an AVFifoBuffer.
void av_freep(void *arg)
Free a memory block which has been allocated with av_malloc(z)() or av_realloc() and set the pointer ...
Definition: mem.c:198
static struct addrinfo * udp_resolve_host(const char *hostname, int port, int type, int family, int flags)
Definition: udp.c:201
#define AV_WL32(p, darg)
Definition: intreadwrite.h:282
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:55
HMTX pthread_mutex_t
Definition: os2threads.h:38
uint8_t
AVOptions.
miscellaneous OS support macros and functions.
int ff_udp_get_local_port(URLContext *h)
Return the local port used by the UDP connection.
Definition: udp.c:421
end end
static int udp_port(struct sockaddr_storage *addr, int addr_len)
Definition: udp.c:352
void av_fifo_free(AVFifoBuffer *f)
Free an AVFifoBuffer.
int av_find_info_tag(char *arg, int arg_size, const char *tag1, const char *info)
Attempt to find a specific tag in a URL.
Definition: parseutils.c:647
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:127
static int udp_close(URLContext *h)
Definition: udp.c:815
int circular_buffer_size
Definition: udp.c:73
int ff_udp_set_remote_url(URLContext *h, const char *uri)
If no filename is given to av_open_input_file because you want to get the local port first...
Definition: udp.c:382
#define closesocket
Definition: ffserver.c:28
#define E
Definition: udp.c:91
int av_fifo_generic_read(AVFifoBuffer *f, void *dest, int buf_size, void(*func)(void *, void *, int))
Feed data from an AVFifoBuffer to a user-supplied callback.
int reuse_socket
Definition: udp.c:66
#define UDP_TX_BUF_SIZE
Definition: udp.c:56
#define D
Definition: udp.c:90
void av_log(void *avcl, int level, const char *fmt,...)
Definition: log.c:246
int dest_addr_len
Definition: udp.c:69
int ff_is_multicast_address(struct sockaddr *addr)
Definition: network.c:202
size_t av_strlcpy(char *dst, const char *src, size_t size)
Copy the string src to dst, but no more than size - 1 bytes, and null-terminate dst.
Definition: avstring.c:82
int timeout
Definition: udp.c:86
int size
#define OFFSET(x)
Definition: udp.c:89
int ai_addrlen
Definition: network.h:119
static const AVOption options[]
Definition: udp.c:92
static int udp_socket_create(UDPContext *s, struct sockaddr_storage *addr, socklen_t *addr_len, const char *localaddr)
Definition: udp.c:315
ret
Definition: avfilter.c:821
uint8_t tmp[UDP_MAX_PKT_SIZE+4]
Definition: udp.c:82
#define ff_neterrno()
Definition: network.h:63
t
Definition: genspecsines3.m:6
int overrun_nonfatal
Definition: udp.c:67
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:76
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:83
static int udp_get_file_handle(URLContext *h)
Return the udp file handle for select() usage to wait for several RTP streams at the same time...
Definition: udp.c:432
int is_connected
Definition: udp.c:70
#define AV_RL32
FIXME Range Coding of cr are level
Definition: snow.txt:367
static pthread_mutex_t * mutex
Definition: w32pthreads.h:68
Definition: udp.c:59
int ff_socket_nonblock(int socket, int enable)
LIBAVUTIL_VERSION_INT
Definition: eval.c:55
static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr)
Definition: udp.c:172
int ttl
Definition: udp.c:62
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:62
NULL
Definition: eval.c:55
#define IPV6_ADD_MEMBERSHIP
Definition: udp.c:52
int av_fifo_space(AVFifoBuffer *f)
Return the amount of space in bytes in the AVFifoBuffer, that is the amount of data you can write int...
char * av_strdup(const char *s)
Duplicate the string s.
Definition: mem.c:220
#define AVIO_FLAG_NONBLOCK
Use non-blocking mode.
Definition: avio.h:351
int local_port
Definition: udp.c:65
int packet_size
Definition: udp.c:85
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:148
#define IPV6_DROP_MEMBERSHIP
Definition: udp.c:53
a very simple circular buffer FIFO implementation
void * buf
Definition: avisynth_c.h:594
Definition: url.h:41
static int udp_set_url(struct sockaddr_storage *addr, const char *hostname, int port)
Definition: udp.c:300
Describe the class of an AVClass context structure.
Definition: log.h:50
synthesis window for stochastic i
void * priv_data
Definition: url.h:44
#define gai_strerror
Definition: network.h:201
AVFifoBuffer * fifo
Definition: udp.c:74
#define snprintf
Definition: snprintf.h:34
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFilterBuffer structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later.That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another.Buffer references ownership and permissions
misc parsing utilities
#define type
const char * name
Definition: url.h:55
int ai_socktype
Definition: network.h:117
static int flags
Definition: cpu.c:23
static const AVClass udp_context_class
Definition: udp.c:107
int av_strerror(int errnum, char *errbuf, size_t errbuf_size)
Put a description of the AVERROR code errnum in errbuf.
Definition: error.c:53
#define getaddrinfo
Definition: network.h:194
Main libavformat public API header.
URLProtocol ff_udp_protocol
Definition: udp.c:837
int av_fifo_size(AVFifoBuffer *f)
Return the amount of data in bytes in the AVFifoBuffer, that is the amount of data you can read from ...
struct sockaddr_storage dest_addr
Definition: udp.c:68
static int udp_read(URLContext *h, uint8_t *buf, int size)
Definition: udp.c:735
int remaining_in_dg
Definition: udp.c:83
struct addrinfo * ai_next
Definition: network.h:122
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:111
static int udp_set_multicast_ttl(int sockfd, int mcastTTL, struct sockaddr *addr)
Definition: udp.c:121
int udp_fd
Definition: udp.c:61
#define getnameinfo
Definition: network.h:196
AVFifoBuffer * av_fifo_alloc(unsigned int size)
Initialize an AVFifoBuffer.
int len
int circular_buffer_error
Definition: udp.c:75
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
Definition: os2threads.h:104
int buffer_size
Definition: udp.c:63
int is_multicast
Definition: udp.c:64
int ai_flags
Definition: network.h:115
int max_packet_size
if non zero, the stream is packetized with this max packet size
Definition: url.h:47
#define UDP_MAX_PKT_SIZE
Definition: udp.c:57
int ff_network_wait_fd(int fd, int write)
Definition: network.c:144
unbuffered private I/O API
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
Definition: os2threads.h:97
static int udp_join_multicast_group(int sockfd, struct sockaddr *addr)
Definition: udp.c:143
void av_fifo_drain(AVFifoBuffer *f, int size)
Discard data from the FIFO.
struct sockaddr * ai_addr
Definition: network.h:120
static int udp_write(URLContext *h, const uint8_t *buf, int size)
Definition: udp.c:794
int ai_family
Definition: network.h:116