Skip to content

Commit

Permalink
wip: checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Apr 19, 2023
1 parent 3030b67 commit 2fa8117
Show file tree
Hide file tree
Showing 20 changed files with 281 additions and 255 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ services:
- .env
environment:
<<: *psql-env
<<: *redis-env
# <<: *redis-env
# NODE_ENV:
# command: "yarn index && sleep 500"
volumes:
Expand Down
50 changes: 27 additions & 23 deletions indexer/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import {GraphQLClient} from "graphql-request";
import {issueCommentsQuery} from "./queries.js";
import {issueCommentsQuery} from "./queries.ts";
import {
Comment,
Commentable,
CommentablesPage,
CommentableType,
CommentsPage,
IssueCommentsResponse,
nextQueryArgs,
QueryVars,
ResponseComment,
ResponseIssue
} from "../lib/types/index.js";
} from "../lib/types/index.ts";
import {FetchResult} from "../lib/types/fetch";

// TODO: move
Expand Down Expand Up @@ -63,21 +63,20 @@ function byCreatedAt(a: Timestamped, b: Timestamped) {
return new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime();
}

export async function fetchIssueComments(client: GraphQLClient, {
owner,
name,
issues
}: nextQueryArgs): Promise<FetchResult> {
export async function fetchIssueComments(client: GraphQLClient, queryVars: QueryVars): Promise<FetchResult> {
console.log("fetchIssueComments")

const {owner, name, pageSize} = queryVars;
const pageVars = queryVars.pageVars ?? {};
const issuesResponse: IssueCommentsResponse = await client.request(issueCommentsQuery, {
owner,
name,
maxIssues: issues?.max,
maxComments: issues?.comments.max,
afterIssue: issues?.comments.after,
beforeIssue: issues?.comments.before,
afterComment: issues?.comments.after,
})
maxIssues: pageSize.commentables,
maxComments: pageSize.comments,
afterIssue: pageVars?.commentables?.after,
beforeIssue: pageVars?.commentables?.before,
afterComment: pageVars?.comments?.after,
});

console.log("got issue comments resopnse")
const {nodes: responseIssues, pageInfo: {hasNextPage, endCursor}} = issuesResponse.repository.issues
Expand All @@ -90,18 +89,24 @@ export async function fetchIssueComments(client: GraphQLClient, {
};
}

// collect & normalize all issues
const normalizedIssues: Commentable[] = responseIssues.map(issue => ({
id: issue.id,
type: CommentableType.ISSUE,
}));

const [issuesWithoutNext, issuesWithNext] = partition(responseIssues, hasMoreComments)
// TODO: refactor - separate normalized issues from normalized comments...
const normalizedIssueComments = issuesWithoutNext.flatMap((issue: ResponseIssue) => issue.comments.nodes)
.map(normalizeResponseComment)
// collect & normalize `Comment`s from issues w/o more comments
const normalizedIssueComments = responseIssues
.flatMap((issue: ResponseIssue) => issue.comments.nodes)
.map(normalizeResponseComment);

// partition issues into w/ and w/o more comments
// TODO: refactor
const [_, issuesWithMoreComments] = partition(responseIssues, hasMoreComments);

// create `CommentsPage`s from issues w/ more comments
// TODO: refactor (normalize)
const pendingCommentsPages = issuesWithNext.map(issue => {
const nextCommentsPages = issuesWithMoreComments.map(issue => {
const {
pageInfo: {endCursor}
} = issue.comments;
Expand All @@ -110,16 +115,15 @@ export async function fetchIssueComments(client: GraphQLClient, {
commentableId: issue.id,
afterComment: endCursor,
} as CommentsPage;
})
});

return {
// TODO: normalize issues (drop unused properties)
commentables: normalizedIssues,
// sort issueComments by createdAt in descending order (i.e. newest first)
comments: normalizedIssueComments.sort(byCreatedAt),
next: {
commentables: nextIssuePage,
comments: pendingCommentsPages,
comments: nextCommentsPages,
},
};
}
Expand All @@ -129,7 +133,7 @@ export async function fetchCommentsPage(client: GraphQLClient, page: CommentsPag
name,
PRs,
issues,
}: nextQueryArgs): Promise<Comment[]> {
}: QueryVars): Promise<Comment[]> {
const {commentableId, afterComment} = page;

// issues: {
Expand Down
2 changes: 1 addition & 1 deletion indexer/fsm/machine.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {interpret} from "xstate";
import newUpdateMachine from "./machine";
import {Context} from "./types";
import {GraphQLClient} from "graphql-request";
import DoneCallback = jest.DoneCallback;
import {Context} from "../../lib/types/indexer";

jest.mock<GraphQLClient>('graphql-request');

Expand Down
5 changes: 3 additions & 2 deletions indexer/fsm/machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import {
upsertAll,
logError,
logUpdated
} from "./services.js";
import {Context} from "./types.js";
} from "./services.ts";

import {Context} from "../../lib/types/indexer";

// TODO: decompose into multiple machines
export function newUpdateMachine(context: Context) {
Expand Down
70 changes: 27 additions & 43 deletions indexer/fsm/services.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import Queue from "bull";

import {Context} from "./types.js";
import {Comment, CommentsPage, Commentable} from "../../lib/types/index.js";
import {fetchIssueComments} from "../fetch.js";
import {Comment, Commentable, CommentablesPage, CommentableType, CommentsPage} from "../../lib/types/index.ts";
import {Context} from "../../lib/types/indexer";
import {Job} from "../../lib/types/job";

export async function logUpdated(ctx: Context, event: any) {
console.log("updated state!");
Expand All @@ -13,55 +11,41 @@ export async function logError(ctx: Context, event: any) {
}

export async function queueAll(ctx: Context, event: any) {
const {queryVars: {owner, name, pageSize}} = ctx;
const {commentables, comments, next} = await fetchIssueComments(ctx.clients.github, {
owner, name,
PRs: {
max: pageSize.commentables,
comments: {max: pageSize.comments}
},
issues: {
max: pageSize.commentables,
comments: {max: pageSize.comments}
},
});
const {queues: {fetching, storing}} = ctx;

// TODO: move?
const firstIssuePage = {commentableType: CommentableType.ISSUE,};
const firstIssueJob: Job<CommentablesPage> = {ctx, nextPage: firstIssuePage};
fetching.commentables.add("enqueue", firstIssueJob);

// TODO: PRs and reviews...


const {queues} = ctx;
// Enqueue normalized commentables and comments
commentables.forEach((commentable: Commentable) =>
queues.storing.commentables.add("enqueue", {commentable})
);
comments.forEach((comment: Comment) =>
queues.storing.comments.add("enqueue", {comment})
);

// Enqueue CommentablePage and CommentPage objects for the next pages
if (next.commentables) {
queues.fetching.commentables.add("enqueue", {nextPage: next.commentables});
}
if (next.comments) {
next.comments.forEach((commentPage: CommentsPage) =>
queues.fetching.comments.add("enqueue", {commentPage})
);
}
const {commentables, comments} = ctx.processes.fetching;

// Process nextCommentables queue
const {storing, fetching} = ctx.processes;
queues.fetching.commentables.process(
fetching.commentables.concurrency,
fetching.commentables.scriptPath,
// Kick-off fetching commentables queue processing (dequeuing)
fetching.commentables.process(
commentables.concurrency,
commentables.scriptPath,
);

// Process nextComments queue
queues.fetching.comments.process(
fetching.comments.concurrency,
fetching.comments.scriptPath,
// Kick-off fetching comments queue processing (dequeuing)
fetching.comments.process(
comments.concurrency,
comments.scriptPath,
);
}

export async function upsertAll(ctx: Context, event: any) {

// const {endpointURL} = ctx.gqlClientConfigs.postgraphile;
const {commentables, comments} = ctx.processes.storing;

// TODO: add authentication
// postgraphile graphql client
// const client = new GraphQLClient(endpointURL);

// Process storing commentables queue
ctx.queues.storing.commentables.process(
commentables.concurrency,
Expand Down
43 changes: 0 additions & 43 deletions indexer/fsm/types.ts
Original file line number Diff line number Diff line change
@@ -1,43 +0,0 @@
import {Queue} from "bull";
import {GraphQLClient} from "graphql-request";

// TODO: consolidate w/ lib/types (?)
export interface Context {
clients: {
github: GraphQLClient,
postgraphile: GraphQLClient,
}
queryVars: {
owner: string;
name: string;
pageSize: {
commentables: number,
comments: number,
}
}
queues: {
fetching: {
commentables: Queue,
comments: Queue,
},
storing: {
commentables: Queue,
comments: Queue,
}
}
processes: {
fetching: {
commentables: ProcessConfig,
comments: ProcessConfig,
},
storing: {
commentables: ProcessConfig,
comments: ProcessConfig,
}
}
}

export interface ProcessConfig {
concurrency: number;
scriptPath: string;
}
54 changes: 20 additions & 34 deletions indexer/main.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import Queue from "bull";
import {GraphQLClient} from 'graphql-request';
import {interpret} from "xstate";
import {newUpdateMachine} from "./fsm/machine.js";
import {Context} from "./fsm/types.js";
import {Comment, Commentable, CommentablesPage, CommentsPage} from "../lib/types/index.js";
import {newUpdateMachine} from "./fsm/machine.ts";

import {Context} from "../lib/types/indexer";

// TODO: check `NODE_ENV`
const defaults = {
Expand All @@ -26,39 +24,27 @@ const commentablesPageSize = 3;
const commentsPageSize = 100;

// TODO: parameterize (?)
const storingCommentablesProcessPath = "./proceses/storing_commentables.js",
storingCommentsProcessPath = "./processes/storing_comments.js",
fetchCommentablesProcessPath = "./proceses/fetch_commentables.js",
fetchCommentsProcessPath = "./processes/fetch_comments.js",
// TODO: fix paths; weird relativeness; OS-agnostic
const storingCommentablesProcessPath = `${__dirname}/queue/processes/storing_commentables.ts`,
storingCommentsProcessPath = `${__dirname}/queue/processes/storing_comments.ts`,
fetchCommentablesProcessPath = `${__dirname}/queue/processes/fetching_commentables.ts`,
fetchCommentsProcessPath = `${__dirname}/queue/processes/fetching_comments.ts`,
storingConcurrency = 2,
fetchingConcurrency = 2
;

const [owner, name] = ghRepo.split("/")


// github graphql client
const ghClient = new GraphQLClient(GITHUB_GRAPHQL_URL, {
headers: {
Authorization: `Bearer ${ghAccessToken}`,
},
});

// TODO: add authentication
// postgraphile graphql client
const pgClient = new GraphQLClient(POSTGRAPHILE_URL);

// TODO: replace with `Bull` queues
const storingCommentablesQueue = new Queue("pendingCommentables");
const storingCommentsQueue = new Queue("pendingComments");
const fetchingCommentablesQueue = new Queue("nextCommentables");
const fetchingCommentsQueue = new Queue("nextComments");

async function run() {
const context: Context = {
clients: {
github: ghClient,
postgraphile: pgClient,
clientConfigs: {
github: {
endpointURL: GITHUB_GRAPHQL_URL,
classicAccessToken: ghAccessToken,
},
postgraphile: {
endpointURL: POSTGRAPHILE_URL,
},
},
queryVars: {
owner, name,
Expand All @@ -69,12 +55,12 @@ async function run() {
},
queues: {
fetching: {
commentables: fetchingCommentablesQueue,
comments: fetchingCommentsQueue,
commentables: "nextCommentables",
comments: "nextComments",
},
storing: {
commentables: storingCommentablesQueue,
comments: storingCommentsQueue,
commentables: "pendingCommentables",
comments: "pendingComments",
},
},
processes: {
Expand Down
8 changes: 4 additions & 4 deletions indexer/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ export const pullRequestCommentsQuery = gql`
$beforeComment: String
) {
repository(owner: $owner, name: $name) {
pullRequests(first: $maxPRs, after: $afterPR, before: $beforePR) {
pullRequests(first: $maxPRs, after: $afterPR, before: $beforePR, orderBy: DESC) {
pageInfo {
hasNextPage
endCursor
}
nodes {
createdAt
updatedAt
comments(first: $maxComments, after: $afterComment, before: $beforeComment) {
comments(first: $maxComments, after: $afterComment, before: $beforeComment, orderBy: DESC) {
pageInfo {
hasNextPage
endCursor
Expand Down Expand Up @@ -54,15 +54,15 @@ export const issueCommentsQuery = gql`
$beforeComment: String
) {
repository(owner: $owner, name: $name) {
issues(first: $maxIssues, after: $afterIssue, before: $beforeIssue) {
issues(first: $maxIssues, after: $afterIssue, before: $beforeIssue, orderBy: DESC) {
pageInfo {
hasNextPage
endCursor
}
nodes {
createdAt
updatedAt
comments(first: $maxComments, after: $afterComment, before: $beforeComment) {
comments(first: $maxComments, after: $afterComment, before: $beforeComment, orderBy: DESC) {
pageInfo {
hasNextPage
endCursor
Expand Down
Loading

0 comments on commit 2fa8117

Please sign in to comment.