异步流建立流式处理数据源模型。 数据流经常异步检索或生成元素。 它们为异步流式处理数据源提供了自然编程模型。
先决条件
需要将计算机设置为运行 .NET,包括 C# 编译器。 C# 编译器随附于 Visual Studio 2022 或 .NET SDK。
将需要创建 GitHub 访问令牌,以便可以访问 GitHub GraphQL 终结点。 为 GitHub 访问令牌选择以下权限:
- repo:status
- public_repo
将访问令牌保存在安全位置,以便可以使用它来访问 GitHub API 终结点。保护个人访问令牌。 任何带有你的个人访问令牌的软件都可以使用你的访问权限进行 GitHub API 调用。
运行初学者应用程序
可以从 asynchronous-programming/snippets 文件夹中的 dotnet/docs 存储库中获取本文中使用的初学者应用程序的代码。
初学者应用程序是一个控制台应用程序,它使用 GitHub GraphQL 接口检索最近在 dotnet/docs 存储库中编写的问题。 首先来看一下以下初学者应用 Main 方法的代码:
static async Task Main(string[] args)
{//Follow these steps to create a GitHub Access Token// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token//Select the following permissions for your GitHub Access Token:// - repo:status// - public_repo// Replace the 3rd parameter to the following code with your GitHub access token.var key = GetEnvVariable("GitHubKey","You must store your GitHub key in the 'GitHubKey' environment variable","");var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo")){Credentials = new Octokit.Credentials(key)};var progressReporter = new progressStatus((num) =>{Console.WriteLine($"Received {num} issues in total");});CancellationTokenSource cancellationSource = new CancellationTokenSource();try{var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",cancellationSource.Token, progressReporter);foreach(var issue in results)Console.WriteLine(issue);}catch (OperationCanceledException){Console.WriteLine("Work has been cancelled");}
}
可以将 GitHubKey 环境变量设置为个人访问令牌,也可以将对 GetEnvVariable 的调用中的最后一个参数替换为个人访问令牌。 如果要与其他人共享源,请不要将访问代码放在源代码中。 不要将访问代码上传到共享源存储库。
在创建 GitHub 客户端后,Main 中的代码将创建一个进度报告对象和一个取消令牌。 创建这些对象之后,Main 调用 RunPagedQueryAsync 来检索最近创建的 250 个问题。 任务完成后,将显示结果。
在运行初学者应用程序时,可以对该应用程序的运行方式进行一些重要观察。 将看到从 GitHub 返回的每个页面的进度报告。 在 GitHub 返回问题的每个新页面之前,可以观察到明显的停顿。 最后,只有在从 GitHub 检索到所有 10 个页面之后,问题才会显示出来。
检查实现情况
该实现揭示了你观察到上一部分中讨论的行为的原因。 检查 RunPagedQueryAsync 的代码:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{var issueAndPRQuery = new GraphQLRequest{Query = queryText};issueAndPRQuery.Variables["repo_name"] = repoName;JArray finalResults = new JArray();bool hasMorePages = true;int pagesReturned = 0;int issuesReturned = 0;// Stop with 10 pages, because these are large repos:while (hasMorePages && (pagesReturned++ < 10)){var postBody = issueAndPRQuery.ToJsonText();var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),postBody, "application/json", "application/json");JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);int totalCount = (int)issues(results)["totalCount"]!;hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();issuesReturned += issues(results)["nodes"]!.Count();finalResults.Merge(issues(results)["nodes"]!);progress?.Report(issuesReturned);cancel.ThrowIfCancellationRequested();}return finalResults;JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
此方法执行的第一个操作是使用 GraphQLRequest
类创建 POST 对象:
public class GraphQLRequest
{[JsonProperty("query")]public string? Query { get; set; }[JsonProperty("variables")]public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();public string ToJsonText() =>JsonConvert.SerializeObject(this);
}
这有助于构成 POST 对象正文,并使用 ToJsonText 方法将其正确转换为以单个字符串呈现的 JSON,该方法从请求正文中删除所有换行符,并用 \(反斜杠)转义字符对其进行标记。
让我们集中讨论前面代码的分页算法和异步结构。 (有关 GitHub GraphQL API 的详细信息,可以参考 GitHub GraphQL 文档。)RunPagedQueryAsync 方法按从最新到最旧的顺序枚举问题。 它每页请求 25 个问题,并检查响应的 pageInfo 结构以继续上一页的操作。 这遵循了 GraphQL 对多页响应的标准分页支持。 响应包括 pageInfo 对象,该对象包含用于请求上一页的 hasPreviousPages 值和 startCursor 值。 问题在 nodes 数组中。 RunPagedQueryAsync 方法将这些节点追加到一个数组中,其中包含所有页面的所有结果。
在检索和还原结果页之后,RunPagedQueryAsync 将报告进度并检查是否取消。 如果已请求取消,RunPagedQueryAsync 将引发 OperationCanceledException。
此代码中有几个可以改进的元素。 最重要的是,RunPagedQueryAsync 必须为返回的所有问题分配存储空间。 该示例在 250 个问题处停止,因为检索所有未决问题需要更多的内存来存储所有检索到的问题。 支持进度报告和取消的协议使得算法在第一次读取时更加难以理解。 涉及更多类型和 API。 必须通过 CancellationTokenSource 及其关联的 CancellationToken 跟踪通信,以了解在何处请求取消,以及在何处授予取消。