Skip to content

Commit

Permalink
Bulk updates
Browse files Browse the repository at this point in the history
- Stopping the crawler when the browser is closed
- Fixing the datasource indexing issue
- Removing link_limits from the Crawler API
- Improve GitHub indexing
- Fix loading animation completion
  • Loading branch information
kursataktas authored Feb 25, 2025
1 parent 61b3624 commit 15125eb
Show file tree
Hide file tree
Showing 20 changed files with 389 additions and 192 deletions.
3 changes: 2 additions & 1 deletion .lycheeignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
http://localhost:8029/
https://platform.openai.com/api-keys
https://mastodon.social/@gurubaseio
https://mastodon.social/@gurubaseio
https://x.com/gurubaseio
2 changes: 2 additions & 0 deletions src/gurubase-backend/backend/backend/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,4 +416,6 @@

API_CONCURRENCY_THROTTLE_RATE = config('API_CONCURRENCY_THROTTLE_RATE', default='10/m')
WEBSHARE_TOKEN = config('WEBSHARE_TOKEN', default='')
GITHUB_FILE_BATCH_SIZE = config('GITHUB_FILE_BATCH_SIZE', default=100, cast=int)
CRAWL_INACTIVE_THRESHOLD_SECONDS = config('CRAWL_INACTIVE_THRESHOLD_SECONDS', default=7, cast=int)

9 changes: 7 additions & 2 deletions src/gurubase-backend/backend/core/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ def get_user(user):
return user

@staticmethod
def start_crawl(guru_slug, user, url, link_limit=1500):
def start_crawl(guru_slug, user, url, link_limit=1500, source=CrawlState.Source.API):
from core.serializers import CrawlStateSerializer
from core.tasks import crawl_website
import re
Expand Down Expand Up @@ -576,7 +576,8 @@ def start_crawl(guru_slug, user, url, link_limit=1500):
status=CrawlState.Status.RUNNING,
link_limit=link_limit,
guru_type=guru_type,
user=user
user=user,
source=source
)
crawl_website.delay(url, crawl_state.id, link_limit)
return CrawlStateSerializer(crawl_state).data, 200
Expand Down Expand Up @@ -607,6 +608,10 @@ def get_crawl_status(guru_slug, user, crawl_id):
# Existing status logic
try:
crawl_state = CrawlState.objects.get(id=crawl_id, guru_type=guru_type)
# Update last_polled_at
crawl_state.last_polled_at = datetime.now(UTC)
crawl_state.save(update_fields=['last_polled_at'])

response_data = CrawlStateSerializer(crawl_state).data
if crawl_state.error_message:
response_data['error_message'] = crawl_state.error_message
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Generated by Django 4.2.18 on 2025-02-24 12:33

from django.db import migrations, models
import django.utils.timezone


class Migration(migrations.Migration):

dependencies = [
('core', '0047_alter_crawlstate_user'),
]

operations = [
migrations.AddField(
model_name='crawlstate',
name='last_polled_at',
field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now),
preserve_default=False,
),
migrations.AddField(
model_name='crawlstate',
name='source',
field=models.CharField(choices=[('UI', 'User Interface'), ('API', 'API')], default='API', max_length=30),
),
]
134 changes: 126 additions & 8 deletions src/gurubase-backend/backend/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,23 +661,125 @@ def save(self, *args, **kwargs):


def write_to_milvus(self):
from core.utils import embed_texts, split_text
from core.utils import embed_texts, split_text, map_extension_to_language, split_code
from core.milvus_utils import insert_vectors
from django.conf import settings

if self.in_milvus:
return

if self.type == DataSource.Type.GITHUB_REPO:
github_files = GithubFile.objects.filter(data_source=self, in_milvus=False)
logger.info(f"Writing {len(github_files)} GitHub files to Milvus")
doc_ids = []
for file in github_files:
logger.info(f"Writing {len(github_files)} GitHub files to Milvus. Repository: {self.url}")
doc_ids = self.doc_ids

# Process files in batches
batch_size = settings.GITHUB_FILE_BATCH_SIZE
for i in range(0, len(github_files), batch_size):
batch = github_files[i:i + batch_size]
logger.info(f"Processing batch {i//batch_size + 1} of {(len(github_files) + batch_size - 1)//batch_size}. Repository: {self.url}")

# First update all links in the batch
files_to_update = []
for file in batch:
if not file.link:
file.link = f'{file.repository_link}/tree/{file.data_source.default_branch}/{file.path}'
files_to_update.append(file)

if files_to_update:
GithubFile.objects.bulk_update(files_to_update, ['link'])

# Prepare all texts and metadata for the batch
all_texts = []
all_metadata = []
file_text_counts = [] # Keep track of how many text chunks each file has

for file in batch:
# Split the content into chunks
extension = file.path.split('/')[-1].split('.')[-1]
language = map_extension_to_language(extension)
if language:
splitted = split_code(
file.content,
settings.SPLIT_SIZE,
settings.SPLIT_MIN_LENGTH,
settings.SPLIT_OVERLAP,
language
)
else:
splitted = split_text(
file.content,
settings.SPLIT_SIZE,
settings.SPLIT_MIN_LENGTH,
settings.SPLIT_OVERLAP,
separators=["\n\n", "\n", ".", "?", "!", " ", ""]
)

metadata = {
"type": file.data_source.type,
"repo_link": file.repository_link,
"link": file.link, # Now we can safely use file.link as it's been updated
"repo_title": file.repo_title,
"title": file.title,
"file_path": file.path
}

# Add texts and metadata
all_texts.extend(splitted)
all_metadata.extend([metadata] * len(splitted))
file_text_counts.append(len(splitted)) # Store count of chunks for this file

# Batch embed all texts
try:
embeddings = embed_texts(all_texts)
except Exception as e:
logger.error(f"Error embedding texts in batch: {traceback.format_exc()}")
continue

if embeddings is None:
logger.error("Embeddings is None for batch")
continue

# Prepare documents for Milvus
docs = []
split_num = 0
guru_slug = self.guru_type.slug

for i, (text, metadata, embedding) in enumerate(zip(all_texts, all_metadata, embeddings)):
split_num += 1
docs.append({
"metadata": {**metadata, "split_num": split_num},
"text": text,
"vector": embedding,
"guru_slug": guru_slug,
})

# Write batch to Milvus
collection_name = settings.GITHUB_REPO_CODE_COLLECTION_NAME
try:
ids = file.write_to_milvus()
doc_ids += ids
batch_ids = list(insert_vectors(collection_name, docs, code=True))
if len(batch_ids) != len(docs):
logger.error(f"Error writing batch to Milvus: {len(batch_ids)} != {len(docs)}")
continue

# Distribute IDs back to files based on chunk counts and prepare for bulk update
start_idx = 0
files_to_update = []
for file, chunk_count in zip(batch, file_text_counts):
end_idx = start_idx + chunk_count
file_ids = batch_ids[start_idx:end_idx]
file.doc_ids = file_ids
file.in_milvus = True
files_to_update.append(file)
start_idx = end_idx
doc_ids.extend(file_ids)

# Bulk update all files in this batch
GithubFile.objects.bulk_update(files_to_update, ['doc_ids', 'in_milvus'])

except Exception as e:
logger.error(f"Error writing file {file.path} to Milvus: {str(e)}")
continue
logger.error(f"Error writing batch to Milvus: {str(e)}")
continue

self.doc_ids = doc_ids
else:
Expand Down Expand Up @@ -1341,6 +1443,12 @@ def delete_from_milvus(self):
from core.milvus_utils import delete_vectors
collection_name = settings.GITHUB_REPO_CODE_COLLECTION_NAME
delete_vectors(collection_name, self.doc_ids)

data_source = self.data_source
for doc_id in self.doc_ids:
data_source.doc_ids.remove(doc_id)
data_source.save()

self.in_milvus = False
self.doc_ids = []
self.save()
Expand Down Expand Up @@ -1404,16 +1512,26 @@ class Status(models.TextChoices):
STOPPED = "STOPPED", "Stopped"
FAILED = "FAILED", "Failed"

class Source(models.TextChoices):
UI = "UI", "User Interface"
API = "API", "API"

url = models.URLField(max_length=2000)
status = models.CharField(
max_length=50,
choices=Status.choices,
default=Status.RUNNING,
)
source = models.CharField(
max_length=30,
choices=Source.choices,
default=Source.API,
)
discovered_urls = models.JSONField(default=list)
error_message = models.TextField(blank=True, null=True)
start_time = models.DateTimeField(auto_now_add=True)
end_time = models.DateTimeField(null=True, blank=True)
last_polled_at = models.DateTimeField(auto_now_add=True)
link_limit = models.IntegerField(default=1500)
guru_type = models.ForeignKey(GuruType, on_delete=models.CASCADE)
user = models.ForeignKey(User, on_delete=models.CASCADE, null=True, blank=True) # null on selfhosted
Expand Down
2 changes: 1 addition & 1 deletion src/gurubase-backend/backend/core/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def to_representation(self, instance):
class CrawlStateSerializer(serializers.ModelSerializer):
class Meta:
model = CrawlState
fields = ['id', 'url', 'status', 'guru_type', 'discovered_urls', 'start_time', 'end_time', 'link_limit']
fields = ['id', 'url', 'status', 'guru_type', 'discovered_urls', 'start_time', 'end_time']

def to_representation(self, instance):
repr = super().to_representation(instance)
Expand Down
23 changes: 21 additions & 2 deletions src/gurubase-backend/backend/core/services/data_source_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


class DataSourceService:
"""Service layer for data source operations"""
"""Service layer for data source CRUD operations"""

def __init__(self, guru_type_object: GuruType, user):
self.guru_type_object = guru_type_object
Expand Down Expand Up @@ -156,4 +156,23 @@ def reindex_data_sources(self, datasource_ids: List[str]) -> None:
for datasource in datasources:
datasource.reindex()

data_source_retrieval.delay(guru_type_slug=self.guru_type_object.slug)
data_source_retrieval.delay(guru_type_slug=self.guru_type_object.slug)

def delete_data_sources(self, datasource_ids: List[str]) -> None:
"""
Deletes specified data sources
Args:
datasource_ids: List of data source IDs to delete
Raises:
ValueError: If no valid data sources found
"""
if not datasource_ids:
raise ValueError('No data sources provided')

data_sources = DataSource.objects.filter(guru_type=self.guru_type_object, id__in=datasource_ids)
if not data_sources:
raise ValueError('No data sources found to delete')

data_sources.delete()
4 changes: 2 additions & 2 deletions src/gurubase-backend/backend/core/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ def manage_github_repo_datasource(sender, instance, **kwargs):
status=DataSource.Status.NOT_PROCESSED
)

data_source_retrieval.delay(guru_type_slug=instance.slug)
data_source_retrieval.delay(guru_type_slug=instance.slug, countdown=1)

# Case 2: Either URL is empty or index_repo is False - Delete DataSource
elif existing_datasource:
Expand All @@ -790,7 +790,7 @@ def data_source_retrieval_on_creation(sender, instance: DataSource, created, **k
from core.tasks import data_source_retrieval

if created and instance.status == DataSource.Status.NOT_PROCESSED:
data_source_retrieval.delay(guru_type_slug=instance.guru_type.slug)
data_source_retrieval.delay(guru_type_slug=instance.guru_type.slug, countdown=1)

@receiver(pre_save, sender=Integration)
def create_api_key_for_integration(sender, instance, **kwargs):
Expand Down
44 changes: 34 additions & 10 deletions src/gurubase-backend/backend/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from core.data_sources import fetch_data_source_content, get_internal_links
from core.requester import GuruRequester, OpenAIRequester
from core.guru_types import get_guru_type_names, get_guru_type_object, get_guru_types_dict
from core.models import DataSource, Favicon, GuruType, LLMEval, LinkReference, LinkValidity, Question, RawQuestion, RawQuestionGeneration, Summarization, SummaryQuestionGeneration, LLMEvalResult, GuruType, GithubFile
from core.models import DataSource, Favicon, GuruType, LLMEval, LinkReference, LinkValidity, Question, RawQuestion, RawQuestionGeneration, Summarization, SummaryQuestionGeneration, LLMEvalResult, GuruType, GithubFile, CrawlState
from core.utils import finalize_data_source_summarizations, embed_texts, generate_questions_from_summary, generate_similar_questions, get_links, get_llm_usage, get_milvus_client, get_more_seo_friendly_title, get_most_similar_questions, guru_type_has_enough_generated_questions, create_guru_type_summarization, simulate_summary_and_answer, validate_guru_type, vector_db_fetch, with_redis_lock, generate_og_image, parse_context_from_prompt, get_default_settings, send_question_request_for_cloudflare_cache, send_guru_type_request_for_cloudflare_cache
from django.conf import settings
import time
Expand All @@ -22,6 +22,8 @@
from django.db.models import Q, Avg, StdDev, Count, Sum, Exists, OuterRef
from statistics import median, mean, stdev
from django.utils import timezone
from django.utils.timezone import now
from datetime import timedelta


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -287,7 +289,7 @@ def process_title(title):
logger.info('Processed titles')

@shared_task
def data_source_retrieval(guru_type_slug=None):
def data_source_retrieval(guru_type_slug=None, countdown=0):
logger.info('Data source retrieval')

@with_redis_lock(
Expand All @@ -296,6 +298,10 @@ def data_source_retrieval(guru_type_slug=None):
settings.DATA_SOURCE_RETRIEVAL_LOCK_DURATION_SECONDS,
)
def process_guru_type_data_sources(guru_type_slug, is_github=False):
if not is_github and countdown > 0:
# Wait for a bit for the data sources to be synced
time.sleep(countdown)

guru_type_object = get_guru_type_object(guru_type_slug)
if is_github:
data_sources = DataSource.objects.filter(
Expand Down Expand Up @@ -1429,17 +1435,13 @@ def process_guru_type(guru_type):
# Create new files in DB
if files_to_create:
created_files = GithubFile.objects.bulk_create(files_to_create)
logger.info(f"Created {len(created_files)} files for data source {data_source.id}")

# Write new files to Milvus
for file in created_files:
try:
file.write_to_milvus()
except Exception as e:
logger.error(f"Error writing file {file.path} to Milvus: {str(e)}")
logger.info(f"Created {len(created_files)} files for data source {str(data_source)}")

# Update data source timestamp
data_source.save() # This will update date_updated

data_source.in_milvus = False
data_source.write_to_milvus()

finally:
# Clean up
Expand Down Expand Up @@ -1518,3 +1520,25 @@ def crawl_website(url: str, crawl_state_id: int, link_limit: int = 1500):
crawl_state.save()
except Exception as e:
logger.error(f"Error updating crawl state: {str(e)}", exc_info=True)

@shared_task
def stop_inactive_ui_crawls():
"""
Periodic task to stop UI crawls that haven't been polled for more than 10 seconds
"""
threshold_seconds = settings.CRAWL_INACTIVE_THRESHOLD_SECONDS
inactivity_threshold = now() - timedelta(seconds=threshold_seconds)

inactive_crawls = CrawlState.objects.filter(
source=CrawlState.Source.UI,
status=CrawlState.Status.RUNNING,
last_polled_at__lt=inactivity_threshold
)

for crawl in inactive_crawls:
crawl.status = CrawlState.Status.STOPPED
crawl.end_time = now()
crawl.error_message = f"Crawl automatically stopped due to inactivity (no status checks for over {threshold_seconds} seconds)"
crawl.save()

return f"Stopped {inactive_crawls.count()} inactive UI crawls"
Loading

0 comments on commit 15125eb

Please sign in to comment.