Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for Instagram #1001

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 82 additions & 30 deletions snscrape/modules/instagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class InstagramPost(snscrape.base.Item):
comments: int
commentsDisabled: bool
isVideo: bool
videoUrl: typing.Optional[str]
id: str

def __str__(self):
return self.url
Expand All @@ -53,24 +55,28 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
self._headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
self._initialPage = None
self._api_url = None
0bmay marked this conversation as resolved.
Show resolved Hide resolved

def _response_to_items(self, response):
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
for node in response[self._edgeXToMedia]['edges']:
code = node['node']['shortcode']
username = node['node']['owner']['username'] if 'username' in node['node']['owner'] else None
url = f'https://www.instagram.com/p/{code}/'

yield InstagramPost(
url = url,
date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
thumbnailUrl = node['node']['thumbnail_src'],
displayUrl = node['node']['display_url'],
username = username,
likes = node['node']['edge_media_preview_like']['count'],
comments = node['node']['edge_media_to_comment']['count'],
commentsDisabled = node['node']['comments_disabled'],
isVideo = node['node']['is_video'],
)
url=url,
0bmay marked this conversation as resolved.
Show resolved Hide resolved
date=datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
content=node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
thumbnailUrl=node['node']['thumbnail_src'],
displayUrl=node['node']['display_url'],
username=username,
likes=node['node']['edge_media_preview_like']['count'],
comments=node['node']['edge_media_to_comment']['count'],
commentsDisabled=node['node']['comments_disabled'],
isVideo=node['node']['is_video'],
videoUrl=node['node']['video_url'] if 'video_url' in node['node'] else None,
id=node['node']['id'],
)

def _initial_page(self):
if self._initialPage is None:
Expand All @@ -80,18 +86,29 @@ def _initial_page(self):
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
elif r.url.startswith('https://www.instagram.com/accounts/login/'):
raise snscrape.base.ScraperException('Redirected to login page')
r = self._get(
self._api_url,
headers=self._headers,
responseOkCallback=self._check_json_callback
)
self._initialPage = r

return self._initialPage

def _check_initial_page_callback(self, r):
if r.status_code != 200:
return True, None
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
try:
obj = json.loads(jsonData)
except json.JSONDecodeError:
return False, 'invalid JSON'
r._snscrape_json_obj = obj
if (match := re.search(
r'\\"csrf_token\\":\\"([\da-zA-Z]+)\\",',
r.text)):
_logger.debug('Found csrf token in HTML')
self._headers['X-Csrftoken'] = match.group(1)
if (match := re.search(
r'"X-IG-App-ID":"(\d+)"',
r.text)):
_logger.debug('Found X-IG-App-ID token in HTML')
self._headers['X-IG-App-ID'] = match.group(1)

return True, None

def _check_json_callback(self, r):
Expand All @@ -112,25 +129,22 @@ def get_items(self):
_logger.warning('Page does not exist')
return
response = r._snscrape_json_obj
rhxGis = response['rhx_gis'] if 'rhx_gis' in response else ''
if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
if response['data'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
_logger.info('Page has no posts')
return
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']:
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
_logger.warning('Private account')
return
pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey]
yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql'])
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
pageID = response['data'][self._responseContainer][self._pageIDKey]
yield from self._response_to_items(response['data'][self._responseContainer])
if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
return
endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']

headers = self._headers.copy()
while True:
_logger.info(f'Retrieving endCursor = {endCursor!r}')
variables = self._variablesFormat.format(**locals())
headers['X-Requested-With'] = 'XMLHttpRequest'
headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest()
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback)

if r.status_code != 200:
Expand All @@ -139,7 +153,7 @@ def get_items(self):
response = r._snscrape_json_obj
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
return
yield from self._response_to_items(response['data'])
yield from self._response_to_items(response['data'][self._responseContainer])
if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
return
endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
Expand All @@ -157,6 +171,7 @@ def __init__(self, username, **kwargs):
self._pageIDKey = 'id'
self._queryHash = 'f2405b236d85e8296cf30347c9f08c2a'
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
self._api_url = f'https://www.instagram.com/api/v1/users/web_profile_info/?username={username}'

def _get_entity(self):
r = self._initial_page()
Expand Down Expand Up @@ -212,6 +227,7 @@ def __init__(self, hashtag, **kwargs):
self._pageIDKey = 'name'
self._queryHash = 'f92f56d47dc7a55b606908374b43a314'
self._variablesFormat = '{{"tag_name":"{pageID}","first":50,"after":"{endCursor}"}}'
self._api_url = f'https://www.instagram.com/api/v1/tags/logged_out_web_info/?tag_name={hashtag.lower()}'

@classmethod
def _cli_setup_parser(cls, subparser):
Expand All @@ -229,11 +245,13 @@ def __init__(self, locationId, **kwargs):
super().__init__(**kwargs)
self._initialUrl = f'https://www.instagram.com/explore/locations/{locationId}/'
self._pageName = 'LocationsPage'
self._responseContainer = 'location'
self._responseContainer = 'recent'
self._edgeXToMedia = 'edge_location_to_media'
self._pageIDKey = 'id'
self._pageIDKey = 'next_page'
self._queryHash = '1b84447a4d8b6d6d0426fefb34514485'
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
self._api_url = f"https://www.instagram.com/api/v1/locations/web_info/?location_id={locationId}"
self._locationId = locationId

@classmethod
def _cli_setup_parser(cls, subparser):
Expand All @@ -242,3 +260,37 @@ def _cli_setup_parser(cls, subparser):
@classmethod
def _cli_from_args(cls, args):
return cls._cli_construct(args, args.locationid)

def get_items(self):
r = self._initial_page()
if r.status_code == 404:
_logger.warning('Page does not exist')
return
response = r._snscrape_json_obj
if len(response['native_location_data'][self._responseContainer]['sections']) == 0:
_logger.info('Page has no posts')
return
yield from self._response_to_items(response['native_location_data'][self._responseContainer])
# querying for more data returns the login page, so 1 set of images is all we get

def _response_to_items(self, response):
for node in response['sections']:
for media in node['layout_content']['medias']:
code = media['media']['code']
username = media['media']['user']['username'] if 'username' in media['media']['user'] else None
url = f'https://www.instagram.com/p/{code}/'

yield InstagramPost(
url=url,
date=datetime.datetime.fromtimestamp(media['media']['taken_at'], datetime.timezone.utc),
content=media['media']['caption']['text'] if media['media']['caption'] else None,
thumbnailUrl=media['media']['image_versions2']['candidates'][-1]['url'],
displayUrl=media['media']['image_versions2']['candidates'][0]['url'],
username=username,
likes=media['media']['like_count'],
comments=media['media']['comment_count'],
commentsDisabled=False,
isVideo=True if 'video_versions' in media['media'] else False,
videoUrl=media['media']['video_versions'][0]['url'] if 'video_versions' in media['media'] else None,
id=media['media']['id'],
)