from typing import Callable
from fp.fp import FreeProxy
import random
import logging
import time
import requests
import httpx
import tempfile
import urllib3
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait, TimeoutException
from selenium.webdriver.common.by import By
from selenium.common.exceptions import WebDriverException, UnexpectedAlertPresentException
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from urllib.parse import urlparse
from contextlib import contextmanager
from deprecated import deprecated
try:
import stem.process
from stem import Signal
from stem.control import Controller
except ImportError:
stem = None
try:
from fake_useragent import UserAgent
FAKE_USERAGENT = True
except Exception:
FAKE_USERAGENT = False
DEFAULT_USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36'
from .data_types import ProxyMode
[docs]class DOSException(Exception):
"""DOS attack was detected."""
[docs]class MaxTriesExceededException(Exception):
"""Maximum number of tries by scholarly reached"""
[docs]class ProxyGenerator(object):
def __init__(self):
# setting up logger
self.logger = logging.getLogger('scholarly')
self._proxy_gen = None
# If we use a proxy or Tor, we set this to True
self._proxy_works = False
self.proxy_mode = None
self._proxies = {}
# If we have a Tor server that we can refresh, we set this to True
self._tor_process = None
self._can_refresh_tor = False
self._tor_control_port = None
self._tor_password = None
self._session = None
self._webdriver = None
self._TIMEOUT = 5
self._new_session()
def __del__(self):
if self._tor_process:
self._tor_process.kill()
self._tor_process.wait()
self._close_session()
[docs] def get_session(self):
return self._session
[docs] def Luminati(self, usr, passwd, proxy_port):
""" Setups a luminati proxy without refreshing capabilities.
:param usr: scholarly username, optional by default None
:type usr: string
:param passwd: scholarly password, optional by default None
:type passwd: string
:param proxy_port: port for the proxy,optional by default None
:type proxy_port: integer
:returns: whether or not the proxy was set up successfully
:rtype: {bool}
:Example::
>>> pg = ProxyGenerator()
>>> success = pg.Luminati(usr = foo, passwd = bar, port = 1200)
"""
if (usr is not None and passwd is not None and proxy_port is not None):
username = usr
password = passwd
port = proxy_port
else:
self.logger.warning("Not enough parameters were provided for the Luminati proxy. Reverting to a local connection.")
return
session_id = random.random()
proxy = f"http://{username}-session-{session_id}:{password}@zproxy.lum-superproxy.io:{port}"
proxy_works = self._use_proxy(http=proxy, https=proxy)
if proxy_works:
self.logger.info("Luminati proxy setup successfully")
self.proxy_mode = ProxyMode.LUMINATI
else:
self.logger.warning("Luminati does not seem to work. Reason unknown.")
return proxy_works
[docs] def SingleProxy(self, http=None, https=None):
"""
Use proxy of your choice
:param http: http proxy address
:type http: string
:param https: https proxy adress
:type https: string
:returns: whether or not the proxy was set up successfully
:rtype: {bool}
:Example::
>>> pg = ProxyGenerator()
>>> success = pg.SingleProxy(http = <http proxy adress>, https = <https proxy adress>)
"""
self.logger.info("Enabling proxies: http=%s https=%s", http, https)
proxy_works = self._use_proxy(http=http, https=https)
if proxy_works:
self.proxy_mode = ProxyMode.SINGLEPROXY
self.logger.info("Proxy setup successfully")
else:
self.logger.warning("Unable to setup the proxy: http=%s https=%s. Reason unknown." , http, https)
return proxy_works
def _check_proxy(self, proxies) -> bool:
"""Checks if a proxy is working.
:param proxies: A dictionary {'http': url1, 'https': url1}
with the urls of the proxies
:returns: whether the proxy is working or not
:rtype: {bool}
"""
with requests.Session() as session:
session.proxies = proxies
try:
resp = session.get("http://httpbin.org/ip", timeout=self._TIMEOUT)
if resp.status_code == 200:
self.logger.info("Proxy works! IP address: %s",
resp.json()["origin"])
return True
elif resp.status_code == 401:
self.logger.warning("Incorrect credentials for proxy!")
return False
except (TimeoutException, TimeoutError):
time.sleep(self._TIMEOUT)
except Exception as e:
# Failure is common and expected with free proxy.
# Do not log at warning level and annoy users.
level = logging.DEBUG if self.proxy_mode is ProxyMode.FREE_PROXIES else logging.WARNING
self.logger.log(level, "Exception while testing proxy: %s", e)
if self.proxy_mode in (ProxyMode.LUMINATI, ProxyMode.SCRAPERAPI):
self.logger.warning("Double check your credentials and try increasing the timeout")
return False
def _refresh_tor_id(self, tor_control_port: int, password: str) -> bool:
"""Refreshes the id by using a new Tor node.
:returns: Whether or not the refresh was succesful
:rtype: {bool}
"""
try:
with Controller.from_port(port=tor_control_port) as controller:
if password:
controller.authenticate(password=password)
else:
controller.authenticate()
controller.signal(Signal.NEWNYM)
self._new_session()
return (True, self._session)
except Exception as e:
err = f"Exception {e} while refreshing TOR. Retrying..."
self.logger.info(err)
return (False, None)
def _use_proxy(self, http: str, https: str = None) -> bool:
"""Allows user to set their own proxy for the connection session.
Sets the proxy if it works.
:param http: the http proxy
:type http: str
:param https: the https proxy (default to the same as http)
:type https: str
:returns: whether or not the proxy was set up successfully
:rtype: {bool}
"""
if http[:4] != "http":
http = "http://" + http
if https is None:
https = http
elif https[:5] != "https":
https = "https://" + https
proxies = {'http://': http, 'https://': https}
if self.proxy_mode == ProxyMode.SCRAPERAPI:
r = requests.get("http://api.scraperapi.com/account", params={'api_key': self._API_KEY}).json()
if "error" in r:
self.logger.warning(r["error"])
self._proxy_works = False
else:
self._proxy_works = r["requestCount"] < int(r["requestLimit"])
self.logger.info("Successful ScraperAPI requests %d / %d",
r["requestCount"], r["requestLimit"])
else:
self._proxy_works = self._check_proxy(proxies)
if self._proxy_works:
self._proxies = proxies
self._new_session(proxies=proxies)
return self._proxy_works
[docs] @deprecated(version='1.5', reason="Tor methods are deprecated and are not actively tested.")
def Tor_External(self, tor_sock_port: int, tor_control_port: int, tor_password: str):
"""
Setting up Tor Proxy. A tor service should be already running on the system. Otherwise you might want to use Tor_Internal
:param tor_sock_port: the port where the Tor sock proxy is running
:type tor_sock_port: int
:param tor_control_port: the port where the Tor control server is running
:type tor_control_port: int
:param tor_password: the password for the Tor control server
:type tor_password: str
:Example::
pg = ProxyGenerator()
pg.Tor_External(tor_sock_port = 9050, tor_control_port = 9051, tor_password = "scholarly_password")
Note: This method is deprecated since v1.5
"""
if stem is None:
raise RuntimeError("Tor methods are not supported with basic version of the package. "
"Please install scholarly[tor] to use this method.")
self._TIMEOUT = 10
proxy = f"socks5://127.0.0.1:{tor_sock_port}"
self._use_proxy(http=proxy, https=proxy)
self._can_refresh_tor, _ = self._refresh_tor_id(tor_control_port, tor_password)
if self._can_refresh_tor:
self._tor_control_port = tor_control_port
self._tor_password = tor_password
else:
self._tor_control_port = None
self._tor_password = None
self.proxy_mode = ProxyMode.TOR_EXTERNAL
# Setting requests timeout to be reasonably long
# to accommodate slowness of the Tor network
return {
"proxy_works": self._proxy_works,
"refresh_works": self._can_refresh_tor,
"tor_control_port": tor_control_port,
"tor_sock_port": tor_sock_port
}
[docs] @deprecated(version='1.5', reason="Tor methods are deprecated and are not actively tested")
def Tor_Internal(self, tor_cmd=None, tor_sock_port=None, tor_control_port=None):
'''
Starts a Tor client running in a scholarly-specific port, together with a scholarly-specific control port.
If no arguments are passed for the tor_sock_port and the tor_control_port they are automatically generated in the following ranges
- tor_sock_port: (9000, 9500)
- tor_control_port: (9500, 9999)
:param tor_cmd: tor executable location (absolute path if its not exported in PATH)
:type tor_cmd: string
:param tor_sock_port: tor socket port
:type tor_sock_port: int
:param tor_control_port: tor control port
:type tor_control_port: int
:Example::
pg = ProxyGenerator()
pg.Tor_Internal(tor_cmd = 'tor')
Note: This method is deprecated since v1.5
'''
if stem is None:
raise RuntimeError("Tor methods are not supported with basic version of the package. "
"Please install scholarly[tor] to use this method.")
self.logger.info("Attempting to start owned Tor as the proxy")
if tor_cmd is None:
self.logger.info("No tor_cmd argument passed. This should point to the location of Tor executable.")
return {
"proxy_works": False,
"refresh_works": False,
"tor_control_port": None,
"tor_sock_port": None
}
if tor_sock_port is None:
# Picking a random port to avoid conflicts
# with simultaneous runs of scholarly
tor_sock_port = random.randrange(9000, 9500)
if tor_control_port is None:
# Picking a random port to avoid conflicts
# with simultaneous runs of scholarly
tor_control_port = random.randrange(9500, 9999)
# TODO: Check that the launched Tor process stops after scholar is done
self._tor_process = stem.process.launch_tor_with_config(
tor_cmd=tor_cmd,
config={
'ControlPort': str(tor_control_port),
'SocksPort': str(tor_sock_port),
'DataDirectory': tempfile.mkdtemp()
# TODO Perhaps we want to also set a password here
},
# take_ownership=True # Taking this out for now, as it seems to cause trouble
)
self.proxy_mode = ProxyMode.TOR_INTERNAL
return self.Tor_External(tor_sock_port, tor_control_port, tor_password=None)
def _has_captcha(self, got_id, got_class) -> bool:
_CAPTCHA_IDS = [
"gs_captcha_ccl", # the normal captcha div
"recaptcha", # the form used on full-page captchas
"captcha-form", # another form used on full-page captchas
]
_DOS_CLASSES = [
"rc-doscaptcha-body",
]
if any([got_class(c) for c in _DOS_CLASSES]):
raise DOSException()
return any([got_id(i) for i in _CAPTCHA_IDS])
def _webdriver_has_captcha(self) -> bool:
"""Tests whether the current webdriver page contains a captcha.
:returns: whether or not the site contains a captcha
:rtype: {bool}
"""
return self._has_captcha(
lambda i : len(self._get_webdriver().find_elements(By.ID, i)) > 0,
lambda c : len(self._get_webdriver().find_elements(By.CLASS_NAME, c)) > 0,
)
def _get_webdriver(self):
if self._webdriver:
try:
_ = self._webdriver.current_url
return self._webdriver
except Exception as e:
self.logger.debug(e)
try:
return self._get_firefox_webdriver()
except Exception as err:
self.logger.debug("Cannot open Firefox/Geckodriver: %s", err)
try:
return self._get_chrome_webdriver()
except Exception as err:
self.logger.debug("Cannot open Chrome: %s", err)
self.logger.info("Neither Chrome nor Firefox/Geckodriver found in PATH")
def _get_chrome_webdriver(self):
if self._proxy_works:
webdriver.DesiredCapabilities.CHROME['proxy'] = {
"httpProxy": self._proxies['http'],
"sslProxy": self._proxies['https'],
"proxyType": "MANUAL"
}
options = webdriver.ChromeOptions()
options.add_argument('--headless')
self._webdriver = webdriver.Chrome('chromedriver', options=options)
self._webdriver.get("https://scholar.google.com") # Need to pre-load to set cookies later
return self._webdriver
def _get_firefox_webdriver(self):
if self._proxy_works:
# Redirect webdriver through proxy
webdriver.DesiredCapabilities.FIREFOX['proxy'] = {
"httpProxy": self._proxies['http'],
"sslProxy": self._proxies['https'],
"proxyType": "MANUAL",
}
options = FirefoxOptions()
options.add_argument('--headless')
self._webdriver = webdriver.Firefox(options=options)
self._webdriver.get("https://scholar.google.com") # Need to pre-load to set cookies later
# It might make sense to (pre)set cookies as well, e.g., to set a GSP ID.
# However, a limitation of webdriver makes it impossible to set cookies for
# domains other than the current active one, cf. https://github.com/w3c/webdriver/issues/1238
# Therefore setting cookies in the session instance for other domains than the on set above
# (e.g., via self._session.cookies.set) will create problems when transferring them to the
# webdriver when handling captchas.
return self._webdriver
def _handle_captcha2(self, url):
cur_host = urlparse(self._get_webdriver().current_url).hostname
for cookie in self._session.cookies:
# Only set cookies matching the current domain, cf. https://github.com/w3c/webdriver/issues/1238
if cur_host is cookie.domain.lstrip('.'):
self._get_webdriver().add_cookie({
'name': cookie.name,
'value': cookie.value,
'path': cookie.path,
'domain':cookie.domain,
})
self._get_webdriver().get(url)
log_interval = 10
cur = 0
timeout = 60*60*24*7 # 1 week
while cur < timeout:
try:
cur = cur + log_interval # Update before exceptions can happen
WebDriverWait(self._get_webdriver(), log_interval).until_not(lambda drv : self._webdriver_has_captcha())
break
except TimeoutException:
self.logger.info(f"Solving the captcha took already {cur} seconds (of maximum {timeout} s).")
except UnexpectedAlertPresentException as e:
# This can apparently happen when reCAPTCHA has hiccups:
# "Cannot contact reCAPTCHA. Check your connection and try again."
self.logger.info(f"Unexpected alert while waiting for captcha completion: {e.args}")
time.sleep(15)
except DOSException as e:
self.logger.info("Google thinks we are DOSing the captcha.")
raise e
except (WebDriverException) as e:
self.logger.info("Browser seems to be disfunctional - closed by user?")
raise e
except Exception as e:
# TODO: This exception handler should eventually be removed when
# we know the "typical" (non-error) exceptions that can occur.
self.logger.info(f"Unhandled {type(e).__name__} while waiting for captcha completion: {e.args}")
else:
raise TimeoutException(f"Could not solve captcha in time (within {timeout} s).")
self.logger.info(f"Solved captcha in less than {cur} seconds.")
for cookie in self._get_webdriver().get_cookies():
cookie.pop("httpOnly", None)
cookie.pop("expiry", None)
cookie.pop("sameSite", None)
self._session.cookies.set(**cookie)
return self._session
def _new_session(self, **kwargs):
init_kwargs = {"follow_redirects": True}
init_kwargs.update(kwargs)
proxies = {}
if self._session:
proxies = self._proxies
self._close_session()
# self._session = httpx.Client()
self.got_403 = False
if FAKE_USERAGENT:
# Suppress the misleading traceback from UserAgent()
with self._suppress_logger('fake_useragent'):
user_agent = UserAgent().random
else:
user_agent = DEFAULT_USER_AGENT
_HEADERS = {
'accept-language': 'en-US,en',
'accept': 'text/html,application/xhtml+xml,application/xml',
'User-Agent': user_agent,
}
# self._session.headers.update(_HEADERS)
init_kwargs.update(headers=_HEADERS)
if self._proxy_works:
init_kwargs["proxies"] = proxies #.get("http", None)
self._proxies = proxies
if self.proxy_mode is ProxyMode.SCRAPERAPI:
# SSL Certificate verification must be disabled for
# ScraperAPI requests to work.
# https://www.scraperapi.com/documentation/
init_kwargs["verify"] = False
self._session = httpx.Client(**init_kwargs)
self._webdriver = None
return self._session
def _close_session(self):
if self._session:
self._session.close()
if self._webdriver:
try:
self._webdriver.quit()
except Exception as e:
self.logger.warning("Could not close webdriver cleanly: %s", e)
def _fp_coroutine(self, timeout=1, wait_time=120):
"""A coroutine to continuosly yield free proxies
It takes back the proxies that stopped working and marks it as dirty.
"""
freeproxy = FreeProxy(rand=False, timeout=timeout)
if not hasattr(self, '_dirty_freeproxies'):
self._dirty_freeproxies = set()
try:
all_proxies = freeproxy.get_proxy_list(repeat=False) # free-proxy >= 1.1.0
except TypeError:
all_proxies = freeproxy.get_proxy_list() # free-proxy < 1.1.0
all_proxies.reverse() # Try the older proxies first
t1 = time.time()
while (time.time()-t1 < wait_time):
proxy = all_proxies.pop()
if not all_proxies:
all_proxies = freeproxy.get_proxy_list()
if proxy in self._dirty_freeproxies:
continue
proxies = {'http://': proxy, 'https://': proxy}
proxy_works = self._check_proxy(proxies)
if proxy_works:
dirty_proxy = (yield proxy)
t1 = time.time()
else:
dirty_proxy = proxy
self._dirty_freeproxies.add(dirty_proxy)
[docs] def FreeProxies(self, timeout=1, wait_time=120):
"""
Sets up continuously rotating proxies from the free-proxy library
:param timeout: Timeout for a single proxy in seconds, optional
:type timeout: float
:param wait_time: Maximum time (in seconds) to wait until newer set of proxies become available at https://sslproxies.org/
:type wait_time: float
:returns: whether or not the proxy was set up successfully
:rtype: {bool}
:Example::
>>> pg = ProxyGenerator()
>>> success = pg.FreeProxies()
"""
self.proxy_mode = ProxyMode.FREE_PROXIES
# FreeProxies is the only mode that is assigned regardless of setup successfully or not.
self._fp_gen = self._fp_coroutine(timeout=timeout, wait_time=wait_time)
self._proxy_gen = self._fp_gen.send
proxy = self._proxy_gen(None) # prime the generator
self.logger.debug("Trying with proxy %s", proxy)
proxy_works = self._use_proxy(proxy)
n_retries = 200
n_tries = 0
while (not proxy_works) and (n_tries < n_retries):
self.logger.debug("Trying with proxy %s", proxy)
proxy_works = self._use_proxy(proxy)
n_tries += 1
if not proxy_works:
proxy = self._proxy_gen(proxy)
if n_tries == n_retries:
n_dirty = len(self._dirty_freeproxies)
self._fp_gen.close()
msg = ("None of the free proxies are working at the moment. "
f"Marked {n_dirty} proxies dirty. Try again after a few minutes."
)
raise MaxTriesExceededException(msg)
else:
return True
[docs] def ScraperAPI(self, API_KEY, country_code=None, premium=False, render=False):
"""
Sets up a proxy using ScraperAPI
The optional parameters are only for Business and Enterprise plans with
ScraperAPI. For more details, https://www.scraperapi.com/documentation/
:Example::
>>> pg = ProxyGenerator()
>>> success = pg.ScraperAPI(API_KEY)
:param API_KEY: ScraperAPI API Key value.
:type API_KEY: string
:type country_code: string, optional by default None
:type premium: bool, optional by default False
:type render: bool, optional by default False
:returns: whether or not the proxy was set up successfully
:rtype: {bool}
"""
if API_KEY is None:
raise ValueError("ScraperAPI API Key is required.")
# Get basic account information. This will NOT be counted towards successful API requests.
r = requests.get("http://api.scraperapi.com/account", params={'api_key': API_KEY}).json()
if "error" in r:
self.logger.warning(r["error"])
return False
self._API_KEY = API_KEY
self.proxy_mode = ProxyMode.SCRAPERAPI
r["requestLimit"] = int(r["requestLimit"])
self.logger.info("Successful ScraperAPI requests %d / %d",
r["requestCount"], r["requestLimit"])
# ScraperAPI documentation recommends setting the timeout to 60 seconds
# so it has had a chance to try out all the retries.
# https://www.scraperapi.com/documentation/
self._TIMEOUT = 60
prefix = "http://scraperapi.retry_404=true"
if country_code is not None:
prefix += ".country_code=" + country_code
if premium:
prefix += ".premium=true"
if render:
prefix += ".render=true"
# Suppress the unavoidable insecure request warnings with ScraperAPI
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
for _ in range(3):
proxy_works = self._use_proxy(http=f'{prefix}:{API_KEY}@proxy-server.scraperapi.com:8001')
if proxy_works:
proxies = {'http://': f"{prefix}:{API_KEY}@proxy-server.scraperapi.com:8001",}
self.logger.info("ScraperAPI proxy setup successfully")
self._new_session(verify=False, proxies=proxies)
return proxy_works
if (r["requestCount"] >= r["requestLimit"]):
self.logger.warning("ScraperAPI account limit reached.")
else:
self.logger.warning("ScraperAPI does not seem to work. Reason unknown.")
return False
[docs] def has_proxy(self) -> bool:
return self._proxy_gen or self._can_refresh_tor
def _set_proxy_generator(self, gen: Callable[..., str]) -> bool:
self._proxy_gen = gen
return True
[docs] def get_next_proxy(self, num_tries = None, old_timeout = 3, old_proxy=None):
new_timeout = old_timeout
if self._can_refresh_tor:
# Check if Tor is running and refresh it
self.logger.info("Refreshing Tor ID...")
self._refresh_tor_id(self._tor_control_port, self._tor_password)
time.sleep(5) # wait for the refresh to happen
new_timeout = self._TIMEOUT # Reset timeout to default
elif self._proxy_gen:
if (num_tries):
self.logger.info("Try #%d failed. Switching proxy.", num_tries)
# Try to get another proxy
new_proxy = self._proxy_gen(old_proxy)
while (not self._use_proxy(new_proxy)):
new_proxy = self._proxy_gen(new_proxy)
new_timeout = self._TIMEOUT # Reset timeout to default
self._new_session()
else:
self._new_session()
return self._session, new_timeout
# A context manager to suppress the misleading traceback from UserAgent()
# Based on https://thesmithfam.org/blog/2012/10/25/temporarily-suppress-console-output-in-python/
@staticmethod
@contextmanager
def _suppress_logger(loggerName: str, level=logging.CRITICAL):
"""Temporarily suppress logging output from a specific logger.
"""
logger = logging.getLogger(loggerName)
original_level = logger.getEffectiveLevel()
logger.setLevel(level)
try:
yield
finally:
logger.setLevel(original_level)