Skip to content

Commit

Permalink
Videodownload 99% stop fix (#792)
Browse files Browse the repository at this point in the history
* Delay ThrottledStream stopwatch creation to first read

* Fix incorrect step for Finalizing Video

* Use CancellationTokenSource timers to cancel DownloadFileAsync after a minimum amount of time

* Bump timeout from 30 seconds to 60

* Add comment explaining CTS trickery

* Increase maxRestartedThreads
  • Loading branch information
ScrubN authored Aug 30, 2023
1 parent e6f957c commit 69a66da
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 38 deletions.
6 changes: 4 additions & 2 deletions TwitchDownloaderCore/Tools/ThrottledStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ public class ThrottledStream : Stream
{
public readonly Stream BaseStream;
public readonly int MaximumBytesPerSecond;
private readonly Stopwatch _watch = Stopwatch.StartNew();
private long _totalBytesRead = 0;
private Stopwatch _watch;
private long _totalBytesRead;

/// <summary>
/// Initializes a new instance of the <see cref="ThrottledStream"/> class
Expand Down Expand Up @@ -79,6 +79,8 @@ private async Task<int> GetBytesToReturnAsync(int count)
if (MaximumBytesPerSecond <= 0)
return count;

_watch ??= Stopwatch.StartNew();

var canSend = (long)(_watch.ElapsedMilliseconds * (MaximumBytesPerSecond / 1000.0));

var diff = (int)(canSend - _totalBytesRead);
Expand Down
134 changes: 98 additions & 36 deletions TwitchDownloaderCore/VideoDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public async Task DownloadAsync(CancellationToken cancellationToken)
GqlVideoChapterResponse videoChapterResponse = await TwitchHelper.GetVideoChapters(downloadOptions.Id);

var (playlistUrl, bandwidth) = await GetPlaylistUrl();
string baseUrl = playlistUrl.Substring(0, playlistUrl.LastIndexOf('/') + 1);
var baseUrl = new Uri(playlistUrl[..(playlistUrl.LastIndexOf('/') + 1)], UriKind.Absolute);

var videoLength = TimeSpan.FromSeconds(videoInfoResponse.data.video.lengthSeconds);
CheckAvailableStorageSpace(bandwidth, videoLength);
Expand Down Expand Up @@ -165,7 +165,7 @@ private void CheckAvailableStorageSpace(int bandwidth, TimeSpan videoLength)
}
}

private async Task DownloadVideoPartsAsync(List<string> videoPartsList, string baseUrl, string downloadFolder, double vodAge, CancellationToken cancellationToken)
private async Task DownloadVideoPartsAsync(List<string> videoPartsList, Uri baseUrl, string downloadFolder, double vodAge, CancellationToken cancellationToken)
{
var partCount = videoPartsList.Count;
var videoPartsQueue = new ConcurrentQueue<string>(videoPartsList);
Expand All @@ -181,33 +181,68 @@ private async Task DownloadVideoPartsAsync(List<string> videoPartsList, string b
LogDownloadThreadExceptions(downloadExceptions);
}

private Task StartNewDownloadThread(ConcurrentQueue<string> videoPartsQueue, string baseUrl, string downloadFolder, double vodAge, CancellationToken cancellationToken)
private Task StartNewDownloadThread(ConcurrentQueue<string> videoPartsQueue, Uri baseUrl, string downloadFolder, double vodAge, CancellationToken cancellationToken)
{
return Task.Factory.StartNew(state =>
return Task.Factory.StartNew(
ExecuteDownloadThread,
new Tuple<ConcurrentQueue<string>, HttpClient, Uri, string, double, int, CancellationToken>(
videoPartsQueue, _httpClient, baseUrl, downloadFolder, vodAge, downloadOptions.ThrottleKib, cancellationToken),
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Current);

static void ExecuteDownloadThread(object state)
{
var (partQueue, httpClient, rootUrl, cacheFolder, videoAge, throttleKib, cancelToken) =
(Tuple<ConcurrentQueue<string>, HttpClient, Uri, string, double, int, CancellationToken>)state;

using var cts = new CancellationTokenSource();
cancelToken.Register(PropagateCancel, cts);

while (!partQueue.IsEmpty)
{
var (partQueue, rootUrl, cacheFolder, videoAge, throttleKib, cancelToken) =
(Tuple<ConcurrentQueue<string>, string, string, double, int, CancellationToken>)state;
cancelToken.ThrowIfCancellationRequested();

while (!partQueue.IsEmpty)
string videoPart = null;
try
{
if (partQueue.TryDequeue(out videoPart))
{
DownloadVideoPartAsync(httpClient, rootUrl, videoPart, cacheFolder, videoAge, throttleKib, cts).GetAwaiter().GetResult();
}
}
catch
{
if (partQueue.TryDequeue(out var request))
if (videoPart != null && !cancelToken.IsCancellationRequested)
{
DownloadVideoPartAsync(rootUrl, request, cacheFolder, videoAge, throttleKib, cancelToken).GetAwaiter().GetResult();
// Requeue the video part now instead of deferring to the verifier since we already know it's bad
partQueue.Enqueue(videoPart);
}

Task.Delay(77, cancelToken).GetAwaiter().GetResult();
throw;
}
}, new Tuple<ConcurrentQueue<string>, string, string, double, int, CancellationToken>(
videoPartsQueue, baseUrl, downloadFolder, vodAge, downloadOptions.ThrottleKib, cancellationToken),
cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);

const int aPrimeNumber = 71;
Thread.Sleep(aPrimeNumber);
}
}

static void PropagateCancel(object tokenSourceToCancel)
{
try
{
((CancellationTokenSource)tokenSourceToCancel)?.Cancel();
}
catch (ObjectDisposedException) { }
}
}

private async Task<Dictionary<int, Exception>> WaitForDownloadThreads(Task[] tasks, ConcurrentQueue<string> videoPartsQueue, string baseUrl, string downloadFolder, double vodAge, int partCount, CancellationToken cancellationToken)
private async Task<IReadOnlyCollection<Exception>> WaitForDownloadThreads(Task[] tasks, ConcurrentQueue<string> videoPartsQueue, Uri baseUrl, string downloadFolder, double vodAge, int partCount, CancellationToken cancellationToken)
{
var allThreadsExited = false;
var previousDoneCount = 0;
var restartedThreads = 0;
var maxRestartedThreads = Math.Max(downloadOptions.DownloadThreads, 10);
var maxRestartedThreads = (int)Math.Max(downloadOptions.DownloadThreads * 1.5, 10);
var downloadExceptions = new Dictionary<int, Exception>();
do
{
Expand Down Expand Up @@ -249,17 +284,17 @@ private async Task<Dictionary<int, Exception>> WaitForDownloadThreads(Task[] tas
throw new AggregateException("The download thread restart limit was reached.", downloadExceptions.Values);
}

return downloadExceptions;
return downloadExceptions.Values;
}

private void LogDownloadThreadExceptions(Dictionary<int, Exception> downloadExceptions)
private void LogDownloadThreadExceptions(IReadOnlyCollection<Exception> downloadExceptions)
{
if (downloadExceptions.Count == 0)
return;

var culpritList = new List<string>();
var sb = new StringBuilder();
foreach (var downloadException in downloadExceptions.Values)
foreach (var downloadException in downloadExceptions)
{
var ex = downloadException switch
{
Expand Down Expand Up @@ -289,7 +324,7 @@ private void LogDownloadThreadExceptions(Dictionary<int, Exception> downloadExce
_progress.Report(new ProgressReport(ReportType.Log, sb.ToString()));
}

private async Task VerifyDownloadedParts(List<string> videoParts, string baseUrl, string downloadFolder, double vodAge, CancellationToken cancellationToken)
private async Task VerifyDownloadedParts(List<string> videoParts, Uri baseUrl, string downloadFolder, double vodAge, CancellationToken cancellationToken)
{
var failedParts = new List<string>();
var partCount = videoParts.Count;
Expand Down Expand Up @@ -408,34 +443,35 @@ private static void HandleFfmpegOutput(string output, Regex encodingTimeRegex, d
// Apparently it is possible for the percent to not be within the range of 0-100. lay295#716
if (percent is < 0 or > 100)
{
progress.Report(new ProgressReport(ReportType.SameLineStatus, "Finalizing Video... [4/4]"));
progress.Report(new ProgressReport(ReportType.SameLineStatus, "Finalizing Video... [5/5]"));
progress.Report(new ProgressReport(0));
}
else
{
progress.Report(new ProgressReport(ReportType.SameLineStatus, $"Finalizing Video {percent}% [4/4]"));
progress.Report(new ProgressReport(ReportType.SameLineStatus, $"Finalizing Video {percent}% [5/5]"));
progress.Report(new ProgressReport(percent));
}
}

private async Task DownloadVideoPartAsync(string baseUrl, string videoPartName, string downloadFolder, double vodAge, int throttleKib, CancellationToken cancellationToken)
/// <remarks>The <paramref name="cancellationTokenSource"/> may be canceled by this method.</remarks>
private static async Task DownloadVideoPartAsync(HttpClient httpClient, Uri baseUrl, string videoPartName, string downloadFolder, double vodAge, int throttleKib, CancellationTokenSource cancellationTokenSource)
{
bool tryUnmute = vodAge < 24;
int errorCount = 0;
int timeoutCount = 0;
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
cancellationTokenSource.Token.ThrowIfCancellationRequested();

try
{
if (tryUnmute && videoPartName.Contains("-muted"))
{
await DownloadFileTaskAsync(baseUrl + videoPartName.Replace("-muted", ""), Path.Combine(downloadFolder, RemoveQueryString(videoPartName)), throttleKib, cancellationToken);
await DownloadFileAsync(httpClient, new Uri(baseUrl, videoPartName.Replace("-muted", "")), Path.Combine(downloadFolder, RemoveQueryString(videoPartName)), throttleKib, cancellationTokenSource);
}
else
{
await DownloadFileTaskAsync(baseUrl + videoPartName, Path.Combine(downloadFolder, RemoveQueryString(videoPartName)), throttleKib, cancellationToken);
await DownloadFileAsync(httpClient, new Uri(baseUrl, videoPartName), Path.Combine(downloadFolder, RemoveQueryString(videoPartName)), throttleKib, cancellationTokenSource);
}

return;
Expand All @@ -446,21 +482,23 @@ private async Task DownloadVideoPartAsync(string baseUrl, string videoPartName,
}
catch (HttpRequestException)
{
if (++errorCount > 10)
const int maxRetries = 10;
if (++errorCount > maxRetries)
{
throw new HttpRequestException($"Video part {videoPartName} failed after 10 retries");
throw new HttpRequestException($"Video part {videoPartName} failed after {maxRetries} retries");
}

await Task.Delay(1_000 * errorCount, cancellationToken);
await Task.Delay(1_000 * errorCount, cancellationTokenSource.Token);
}
catch (TaskCanceledException ex) when (ex.Message.Contains("HttpClient.Timeout"))
{
if (++timeoutCount > 3)
const int maxRetries = 3;
if (++timeoutCount > maxRetries)
{
throw new HttpRequestException($"Video part {videoPartName} timed out 3 times");
throw new HttpRequestException($"Video part {videoPartName} timed out {maxRetries} times");
}

await Task.Delay(5_000 * timeoutCount, cancellationToken);
await Task.Delay(5_000 * timeoutCount, cancellationTokenSource.Token);
}
}
}
Expand Down Expand Up @@ -558,17 +596,36 @@ private async Task DownloadVideoPartAsync(string baseUrl, string videoPartName,
/// <summary>
/// Downloads the requested <paramref name="url"/> to the <paramref name="destinationFile"/> without storing it in memory.
/// </summary>
/// <param name="httpClient">The <see cref="HttpClient"/> to perform the download operation.</param>
/// <param name="url">The url of the file to download.</param>
/// <param name="destinationFile">The path to the file where download will be saved.</param>
/// <param name="throttleKib">The maximum download speed in kibibytes per second, or -1 for no maximum.</param>
/// <param name="cancellationToken">The cancellation token to cancel the operation.</param>
private async Task DownloadFileTaskAsync(string url, string destinationFile, int throttleKib, CancellationToken cancellationToken = default)
/// <param name="cancellationTokenSource">A <see cref="CancellationTokenSource"/> containing a <see cref="CancellationToken"/> to cancel the operation.</param>
/// <remarks>The <paramref name="cancellationTokenSource"/> may be canceled by this method.</remarks>
private static async Task DownloadFileAsync(HttpClient httpClient, Uri url, string destinationFile, int throttleKib, CancellationTokenSource cancellationTokenSource = null)
{
var request = new HttpRequestMessage(HttpMethod.Get, url);

using var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
var cancellationToken = cancellationTokenSource?.Token ?? CancellationToken.None;

using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
response.EnsureSuccessStatusCode();

// Why are we setting a CTS CancelAfter timer? See lay295#265
const int sixtySeconds = 60;
if (throttleKib == -1 || !response.Content.Headers.ContentLength.HasValue)
{
cancellationTokenSource?.CancelAfter(TimeSpan.FromSeconds(sixtySeconds));
}
else
{
const double oneKibibyte = 1024d;
cancellationTokenSource?.CancelAfter(TimeSpan.FromSeconds(Math.Max(
sixtySeconds,
response.Content.Headers.ContentLength!.Value / oneKibibyte / throttleKib * 8 // Allow up to 8x the shortest download time given the thread bandwidth
)));
}

switch (throttleKib)
{
case -1:
Expand All @@ -581,20 +638,25 @@ private async Task DownloadFileTaskAsync(string url, string destinationFile, int
{
try
{
await using var throttledStream = new ThrottledStream(await response.Content.ReadAsStreamAsync(cancellationToken), throttleKib);
await using var contentStream = await response.Content.ReadAsStreamAsync(cancellationToken);
await using var throttledStream = new ThrottledStream(contentStream, throttleKib);
await using var fs = new FileStream(destinationFile, FileMode.Create, FileAccess.Write, FileShare.Read);
await throttledStream.CopyToAsync(fs, cancellationToken).ConfigureAwait(false);
}
catch (IOException e) when (e.Message.Contains("EOF"))
{
// The throttled stream throws when it reads an unexpected EOF, try again without the limiter
// If we get an exception for EOF, it may be related to the throttler. Try again without it.
// TODO: Log this somehow
await Task.Delay(2_000, cancellationToken);
goto case -1;
}
break;
}
}

// Reset the cts timer so it can be reused for the next download on this thread.
// Is there a friendlier way to do this? Yes. Does it involve creating and destroying 4,000 CancellationTokenSources that are almost never cancelled? Also Yes.
cancellationTokenSource?.CancelAfter(TimeSpan.FromMilliseconds(uint.MaxValue - 1));
}

private async Task CombineVideoParts(string downloadFolder, List<string> videoParts, CancellationToken cancellationToken)
Expand Down

0 comments on commit 69a66da

Please sign in to comment.