diff --git a/doc/protocols.texi b/doc/protocols.texi index f54600b846..642cde962a 100644 --- a/doc/protocols.texi +++ b/doc/protocols.texi @@ -948,6 +948,9 @@ URL to player swf file, compute hash/size automatically. @item rtmp_tcurl URL of the target stream. Defaults to proto://host[:port]/app. +@item rtmp_window +Size of the RTMP window. Defaults to 2500000. + @item tcp_nodelay=@var{1|0} Set TCP_NODELAY to disable Nagle's algorithm. Default value is 0. diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c index a602bf6a96..3fcb4e8f14 100644 --- a/libavformat/rtmppkt.c +++ b/libavformat/rtmppkt.c @@ -159,7 +159,10 @@ int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p, { uint8_t hdr; - if (ffurl_read(h, &hdr, 1) != 1) + int ret = ffurl_read(h, &hdr, 1); + if (ret == AVERROR_EOF) + return AVERROR_EOF; + if (ret != 1) return AVERROR(EIO); return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt, diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c index 98718bc6da..189e1c7d02 100644 --- a/libavformat/rtmpproto.c +++ b/libavformat/rtmpproto.c @@ -97,6 +97,8 @@ typedef struct RTMPContext { uint32_t receive_report_size; ///< number of bytes after which we should report the number of received bytes to the peer uint64_t bytes_read; ///< number of bytes read from server uint64_t last_bytes_read; ///< number of bytes read last reported to server + uint64_t bytes_sent; ///< number of bytes sent to the client + uint64_t last_bytes_sent; ///< number of bytes last acknowledged by the client uint32_t last_timestamp; ///< last timestamp received in a packet int skip_bytes; ///< number of bytes to skip from the input FLV stream in the next write call int has_audio; ///< presence of audio data @@ -251,6 +253,7 @@ static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track) ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size, &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); + rt->bytes_sent += pkt->size; fail: ff_rtmp_packet_destroy(pkt); return ret; @@ -472,7 +475,8 @@ static int read_connect(URLContext *s, RTMPContext *rt) ff_rtmp_packet_destroy(&pkt); return AVERROR_UNKNOWN; } else if (pkt.type == RTMP_PT_BYTES_READ) { - av_log(s, AV_LOG_TRACE, "received acknowledgement\n"); + rt->last_bytes_read = AV_RB32(pkt.data); + av_log(s, AV_LOG_TRACE, "received acknowledgement (%lu)\n", rt->last_bytes_read); } else if (pkt.type == RTMP_PT_WINDOW_ACK_SIZE) { if ((ret = handle_window_ack_size(s, &pkt)) < 0) { ff_rtmp_packet_destroy(&pkt); @@ -2344,7 +2348,8 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) switch (pkt->type) { case RTMP_PT_BYTES_READ: - av_log(s, AV_LOG_TRACE, "received bytes read report\n"); + rt->last_bytes_sent = AV_RB32(pkt->data); + av_log(s, AV_LOG_TRACE, "received bytes read report (%lu)\n", rt->last_bytes_sent); break; case RTMP_PT_CHUNK_SIZE: if ((ret = handle_chunk_size(s, pkt)) < 0) @@ -2455,6 +2460,8 @@ static int get_packet(URLContext *s, int for_header) &rt->nb_prev_pkt[0])) <= 0) { if (ret == 0) { return AVERROR(EAGAIN); + } else if (ret == AVERROR_EOF) { + return AVERROR_EOF; } else { return AVERROR(EIO); } @@ -2468,7 +2475,7 @@ static int get_packet(URLContext *s, int for_header) av_log(s, AV_LOG_DEBUG, "Sending bytes read report\n"); if ((ret = gen_bytes_read(s, rt, rpkt.timestamp + 1)) < 0) { ff_rtmp_packet_destroy(&rpkt); - return ret; + av_log(s, AV_LOG_DEBUG, "Failed ACKing to the server, connection finished?\n"); } rt->last_bytes_read = rt->bytes_read; } @@ -2529,6 +2536,7 @@ static int rtmp_close(URLContext *h) { RTMPContext *rt = h->priv_data; int ret = 0, i, j; + uint8_t c; if (!rt->is_input) { rt->flv_data = NULL; @@ -2547,7 +2555,12 @@ static int rtmp_close(URLContext *h) free_tracked_methods(rt); av_freep(&rt->flv_data); - ffurl_closep(&rt->stream); + if (rt->state > STATE_HANDSHAKED) { + ffurl_shutdown(rt->stream, AVIO_FLAG_WRITE); + while (ffurl_read(rt->stream, &c, 1) >= 0); + } else { + ffurl_closep(&rt->stream); + } return ret; } @@ -2835,11 +2848,12 @@ reconnect: rt->receive_report_size = 1048576; rt->bytes_read = 0; + rt->bytes_sent = 0; rt->has_audio = 0; rt->has_video = 0; rt->received_metadata = 0; rt->last_bytes_read = 0; - rt->max_sent_unacked = 2500000; + rt->last_bytes_sent = 0; rt->duration = 0; av_log(s, AV_LOG_DEBUG, "Proto = %s, path = %s, app = %s, fname = %s\n", @@ -3095,8 +3109,14 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size) return size; rt->flv_nb_packets = 0; - /* set stream into nonblocking mode */ - rt->stream->flags |= AVIO_FLAG_NONBLOCK; + /* it is time to start throttling? */ + if (rt->bytes_sent < rt->last_bytes_sent + rt->max_sent_unacked) { + /* set stream into nonblocking mode */ + rt->stream->flags |= AVIO_FLAG_NONBLOCK; + } else { + av_log(s, AV_LOG_DEBUG, "Throttling, sent %lu bytes, client has acknowledged %lu bytes\n", rt->bytes_sent, + rt->last_bytes_sent); + } /* try to read one byte from the stream */ ret = ffurl_read(rt->stream, &c, 1); @@ -3139,6 +3159,7 @@ static const AVOption rtmp_options[] = { {"rtmp_flush_interval", "Number of packets flushed in the same request (RTMPT only).", OFFSET(flush_interval), AV_OPT_TYPE_INT, {.i64 = 10}, 0, INT_MAX, ENC}, {"rtmp_enhanced_codecs", "Specify the codec(s) to use in an enhanced rtmp live stream", OFFSET(enhanced_codecs), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, ENC}, {"rtmp_live", "Specify that the media is a live stream.", OFFSET(live), AV_OPT_TYPE_INT, {.i64 = -2}, INT_MIN, INT_MAX, DEC, "rtmp_live"}, + {"rtmp_window", "RTMP window size (server only).", OFFSET(max_sent_unacked), AV_OPT_TYPE_INT, {.i64 = 2500000}, INT_MIN, INT_MAX, DEC, "rtmp_window"}, {"any", "both", 0, AV_OPT_TYPE_CONST, {.i64 = -2}, 0, 0, DEC, "rtmp_live"}, {"live", "live stream", 0, AV_OPT_TYPE_CONST, {.i64 = -1}, 0, 0, DEC, "rtmp_live"}, {"recorded", "recorded stream", 0, AV_OPT_TYPE_CONST, {.i64 = 0}, 0, 0, DEC, "rtmp_live"},