Source code for scholarly._proxy_generator

from typing import Callable
from fp.fp import FreeProxy
import random
import logging
import time
import requests
import httpx
import tempfile
import urllib3

from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait, TimeoutException
from selenium.webdriver.common.by import By
from selenium.common.exceptions import WebDriverException, UnexpectedAlertPresentException
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from urllib.parse import urlparse
from contextlib import contextmanager
from deprecated import deprecated
try:
    import stem.process
    from stem import Signal
    from stem.control import Controller
except ImportError:
    stem = None

try:
    from fake_useragent import UserAgent
    FAKE_USERAGENT = True
except Exception:
    FAKE_USERAGENT = False
    DEFAULT_USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36'

from .data_types import ProxyMode


[docs]class DOSException(Exception): """DOS attack was detected."""
[docs]class MaxTriesExceededException(Exception): """Maximum number of tries by scholarly reached"""
[docs]class ProxyGenerator(object): def __init__(self): # setting up logger self.logger = logging.getLogger('scholarly') self._proxy_gen = None # If we use a proxy or Tor, we set this to True self._proxy_works = False self.proxy_mode = None self._proxies = {} # If we have a Tor server that we can refresh, we set this to True self._tor_process = None self._can_refresh_tor = False self._tor_control_port = None self._tor_password = None self._session = None self._webdriver = None self._TIMEOUT = 5 self._new_session() def __del__(self): if self._tor_process: self._tor_process.kill() self._tor_process.wait() self._close_session()
[docs] def get_session(self): return self._session
[docs] def Luminati(self, usr, passwd, proxy_port): """ Setups a luminati proxy without refreshing capabilities. :param usr: scholarly username, optional by default None :type usr: string :param passwd: scholarly password, optional by default None :type passwd: string :param proxy_port: port for the proxy,optional by default None :type proxy_port: integer :returns: whether or not the proxy was set up successfully :rtype: {bool} :Example:: >>> pg = ProxyGenerator() >>> success = pg.Luminati(usr = foo, passwd = bar, port = 1200) """ if (usr is not None and passwd is not None and proxy_port is not None): username = usr password = passwd port = proxy_port else: self.logger.warning("Not enough parameters were provided for the Luminati proxy. Reverting to a local connection.") return session_id = random.random() proxy = f"http://{username}-session-{session_id}:{password}@zproxy.lum-superproxy.io:{port}" proxy_works = self._use_proxy(http=proxy, https=proxy) if proxy_works: self.logger.info("Luminati proxy setup successfully") self.proxy_mode = ProxyMode.LUMINATI else: self.logger.warning("Luminati does not seem to work. Reason unknown.") return proxy_works
[docs] def SingleProxy(self, http=None, https=None): """ Use proxy of your choice :param http: http proxy address :type http: string :param https: https proxy adress :type https: string :returns: whether or not the proxy was set up successfully :rtype: {bool} :Example:: >>> pg = ProxyGenerator() >>> success = pg.SingleProxy(http = <http proxy adress>, https = <https proxy adress>) """ self.logger.info("Enabling proxies: http=%s https=%s", http, https) proxy_works = self._use_proxy(http=http, https=https) if proxy_works: self.proxy_mode = ProxyMode.SINGLEPROXY self.logger.info("Proxy setup successfully") else: self.logger.warning("Unable to setup the proxy: http=%s https=%s. Reason unknown." , http, https) return proxy_works
def _check_proxy(self, proxies) -> bool: """Checks if a proxy is working. :param proxies: A dictionary {'http': url1, 'https': url1} with the urls of the proxies :returns: whether the proxy is working or not :rtype: {bool} """ with requests.Session() as session: session.proxies = proxies try: resp = session.get("http://httpbin.org/ip", timeout=self._TIMEOUT) if resp.status_code == 200: self.logger.info("Proxy works! IP address: %s", resp.json()["origin"]) return True elif resp.status_code == 401: self.logger.warning("Incorrect credentials for proxy!") return False except (TimeoutException, TimeoutError): time.sleep(self._TIMEOUT) except Exception as e: # Failure is common and expected with free proxy. # Do not log at warning level and annoy users. level = logging.DEBUG if self.proxy_mode is ProxyMode.FREE_PROXIES else logging.WARNING self.logger.log(level, "Exception while testing proxy: %s", e) if self.proxy_mode in (ProxyMode.LUMINATI, ProxyMode.SCRAPERAPI): self.logger.warning("Double check your credentials and try increasing the timeout") return False def _refresh_tor_id(self, tor_control_port: int, password: str) -> bool: """Refreshes the id by using a new Tor node. :returns: Whether or not the refresh was succesful :rtype: {bool} """ try: with Controller.from_port(port=tor_control_port) as controller: if password: controller.authenticate(password=password) else: controller.authenticate() controller.signal(Signal.NEWNYM) self._new_session() return (True, self._session) except Exception as e: err = f"Exception {e} while refreshing TOR. Retrying..." self.logger.info(err) return (False, None) def _use_proxy(self, http: str, https: str = None) -> bool: """Allows user to set their own proxy for the connection session. Sets the proxy if it works. :param http: the http proxy :type http: str :param https: the https proxy (default to the same as http) :type https: str :returns: whether or not the proxy was set up successfully :rtype: {bool} """ if http[:4] != "http": http = "http://" + http if https is None: https = http elif https[:5] != "https": https = "https://" + https proxies = {'http://': http, 'https://': https} if self.proxy_mode == ProxyMode.SCRAPERAPI: r = requests.get("http://api.scraperapi.com/account", params={'api_key': self._API_KEY}).json() if "error" in r: self.logger.warning(r["error"]) self._proxy_works = False else: self._proxy_works = r["requestCount"] < int(r["requestLimit"]) self.logger.info("Successful ScraperAPI requests %d / %d", r["requestCount"], r["requestLimit"]) else: self._proxy_works = self._check_proxy(proxies) if self._proxy_works: self._proxies = proxies self._new_session(proxies=proxies) return self._proxy_works
[docs] @deprecated(version='1.5', reason="Tor methods are deprecated and are not actively tested.") def Tor_External(self, tor_sock_port: int, tor_control_port: int, tor_password: str): """ Setting up Tor Proxy. A tor service should be already running on the system. Otherwise you might want to use Tor_Internal :param tor_sock_port: the port where the Tor sock proxy is running :type tor_sock_port: int :param tor_control_port: the port where the Tor control server is running :type tor_control_port: int :param tor_password: the password for the Tor control server :type tor_password: str :Example:: pg = ProxyGenerator() pg.Tor_External(tor_sock_port = 9050, tor_control_port = 9051, tor_password = "scholarly_password") Note: This method is deprecated since v1.5 """ if stem is None: raise RuntimeError("Tor methods are not supported with basic version of the package. " "Please install scholarly[tor] to use this method.") self._TIMEOUT = 10 proxy = f"socks5://127.0.0.1:{tor_sock_port}" self._use_proxy(http=proxy, https=proxy) self._can_refresh_tor, _ = self._refresh_tor_id(tor_control_port, tor_password) if self._can_refresh_tor: self._tor_control_port = tor_control_port self._tor_password = tor_password else: self._tor_control_port = None self._tor_password = None self.proxy_mode = ProxyMode.TOR_EXTERNAL # Setting requests timeout to be reasonably long # to accommodate slowness of the Tor network return { "proxy_works": self._proxy_works, "refresh_works": self._can_refresh_tor, "tor_control_port": tor_control_port, "tor_sock_port": tor_sock_port }
[docs] @deprecated(version='1.5', reason="Tor methods are deprecated and are not actively tested") def Tor_Internal(self, tor_cmd=None, tor_sock_port=None, tor_control_port=None): ''' Starts a Tor client running in a scholarly-specific port, together with a scholarly-specific control port. If no arguments are passed for the tor_sock_port and the tor_control_port they are automatically generated in the following ranges - tor_sock_port: (9000, 9500) - tor_control_port: (9500, 9999) :param tor_cmd: tor executable location (absolute path if its not exported in PATH) :type tor_cmd: string :param tor_sock_port: tor socket port :type tor_sock_port: int :param tor_control_port: tor control port :type tor_control_port: int :Example:: pg = ProxyGenerator() pg.Tor_Internal(tor_cmd = 'tor') Note: This method is deprecated since v1.5 ''' if stem is None: raise RuntimeError("Tor methods are not supported with basic version of the package. " "Please install scholarly[tor] to use this method.") self.logger.info("Attempting to start owned Tor as the proxy") if tor_cmd is None: self.logger.info("No tor_cmd argument passed. This should point to the location of Tor executable.") return { "proxy_works": False, "refresh_works": False, "tor_control_port": None, "tor_sock_port": None } if tor_sock_port is None: # Picking a random port to avoid conflicts # with simultaneous runs of scholarly tor_sock_port = random.randrange(9000, 9500) if tor_control_port is None: # Picking a random port to avoid conflicts # with simultaneous runs of scholarly tor_control_port = random.randrange(9500, 9999) # TODO: Check that the launched Tor process stops after scholar is done self._tor_process = stem.process.launch_tor_with_config( tor_cmd=tor_cmd, config={ 'ControlPort': str(tor_control_port), 'SocksPort': str(tor_sock_port), 'DataDirectory': tempfile.mkdtemp() # TODO Perhaps we want to also set a password here }, # take_ownership=True # Taking this out for now, as it seems to cause trouble ) self.proxy_mode = ProxyMode.TOR_INTERNAL return self.Tor_External(tor_sock_port, tor_control_port, tor_password=None)
def _has_captcha(self, got_id, got_class) -> bool: _CAPTCHA_IDS = [ "gs_captcha_ccl", # the normal captcha div "recaptcha", # the form used on full-page captchas "captcha-form", # another form used on full-page captchas ] _DOS_CLASSES = [ "rc-doscaptcha-body", ] if any([got_class(c) for c in _DOS_CLASSES]): raise DOSException() return any([got_id(i) for i in _CAPTCHA_IDS]) def _webdriver_has_captcha(self) -> bool: """Tests whether the current webdriver page contains a captcha. :returns: whether or not the site contains a captcha :rtype: {bool} """ return self._has_captcha( lambda i : len(self._get_webdriver().find_elements(By.ID, i)) > 0, lambda c : len(self._get_webdriver().find_elements(By.CLASS_NAME, c)) > 0, ) def _get_webdriver(self): if self._webdriver: try: _ = self._webdriver.current_url return self._webdriver except Exception as e: self.logger.debug(e) try: return self._get_firefox_webdriver() except Exception as err: self.logger.debug("Cannot open Firefox/Geckodriver: %s", err) try: return self._get_chrome_webdriver() except Exception as err: self.logger.debug("Cannot open Chrome: %s", err) self.logger.info("Neither Chrome nor Firefox/Geckodriver found in PATH") def _get_chrome_webdriver(self): if self._proxy_works: webdriver.DesiredCapabilities.CHROME['proxy'] = { "httpProxy": self._proxies['http'], "sslProxy": self._proxies['https'], "proxyType": "MANUAL" } options = webdriver.ChromeOptions() options.add_argument('--headless') self._webdriver = webdriver.Chrome('chromedriver', options=options) self._webdriver.get("https://scholar.google.com") # Need to pre-load to set cookies later return self._webdriver def _get_firefox_webdriver(self): if self._proxy_works: # Redirect webdriver through proxy webdriver.DesiredCapabilities.FIREFOX['proxy'] = { "httpProxy": self._proxies['http'], "sslProxy": self._proxies['https'], "proxyType": "MANUAL", } options = FirefoxOptions() options.add_argument('--headless') self._webdriver = webdriver.Firefox(options=options) self._webdriver.get("https://scholar.google.com") # Need to pre-load to set cookies later # It might make sense to (pre)set cookies as well, e.g., to set a GSP ID. # However, a limitation of webdriver makes it impossible to set cookies for # domains other than the current active one, cf. https://github.com/w3c/webdriver/issues/1238 # Therefore setting cookies in the session instance for other domains than the on set above # (e.g., via self._session.cookies.set) will create problems when transferring them to the # webdriver when handling captchas. return self._webdriver def _handle_captcha2(self, url): cur_host = urlparse(self._get_webdriver().current_url).hostname for cookie in self._session.cookies: # Only set cookies matching the current domain, cf. https://github.com/w3c/webdriver/issues/1238 if cur_host is cookie.domain.lstrip('.'): self._get_webdriver().add_cookie({ 'name': cookie.name, 'value': cookie.value, 'path': cookie.path, 'domain':cookie.domain, }) self._get_webdriver().get(url) log_interval = 10 cur = 0 timeout = 60*60*24*7 # 1 week while cur < timeout: try: cur = cur + log_interval # Update before exceptions can happen WebDriverWait(self._get_webdriver(), log_interval).until_not(lambda drv : self._webdriver_has_captcha()) break except TimeoutException: self.logger.info(f"Solving the captcha took already {cur} seconds (of maximum {timeout} s).") except UnexpectedAlertPresentException as e: # This can apparently happen when reCAPTCHA has hiccups: # "Cannot contact reCAPTCHA. Check your connection and try again." self.logger.info(f"Unexpected alert while waiting for captcha completion: {e.args}") time.sleep(15) except DOSException as e: self.logger.info("Google thinks we are DOSing the captcha.") raise e except (WebDriverException) as e: self.logger.info("Browser seems to be disfunctional - closed by user?") raise e except Exception as e: # TODO: This exception handler should eventually be removed when # we know the "typical" (non-error) exceptions that can occur. self.logger.info(f"Unhandled {type(e).__name__} while waiting for captcha completion: {e.args}") else: raise TimeoutException(f"Could not solve captcha in time (within {timeout} s).") self.logger.info(f"Solved captcha in less than {cur} seconds.") for cookie in self._get_webdriver().get_cookies(): cookie.pop("httpOnly", None) cookie.pop("expiry", None) cookie.pop("sameSite", None) self._session.cookies.set(**cookie) return self._session def _new_session(self, **kwargs): init_kwargs = {"follow_redirects": True} init_kwargs.update(kwargs) proxies = {} if self._session: proxies = self._proxies self._close_session() # self._session = httpx.Client() self.got_403 = False if FAKE_USERAGENT: # Suppress the misleading traceback from UserAgent() with self._suppress_logger('fake_useragent'): user_agent = UserAgent().random else: user_agent = DEFAULT_USER_AGENT _HEADERS = { 'accept-language': 'en-US,en', 'accept': 'text/html,application/xhtml+xml,application/xml', 'User-Agent': user_agent, } # self._session.headers.update(_HEADERS) init_kwargs.update(headers=_HEADERS) if self._proxy_works: init_kwargs["proxies"] = proxies #.get("http", None) self._proxies = proxies if self.proxy_mode is ProxyMode.SCRAPERAPI: # SSL Certificate verification must be disabled for # ScraperAPI requests to work. # https://www.scraperapi.com/documentation/ init_kwargs["verify"] = False self._session = httpx.Client(**init_kwargs) self._webdriver = None return self._session def _close_session(self): if self._session: self._session.close() if self._webdriver: try: self._webdriver.quit() except Exception as e: self.logger.warning("Could not close webdriver cleanly: %s", e) def _fp_coroutine(self, timeout=1, wait_time=120): """A coroutine to continuosly yield free proxies It takes back the proxies that stopped working and marks it as dirty. """ freeproxy = FreeProxy(rand=False, timeout=timeout) if not hasattr(self, '_dirty_freeproxies'): self._dirty_freeproxies = set() try: all_proxies = freeproxy.get_proxy_list(repeat=False) # free-proxy >= 1.1.0 except TypeError: all_proxies = freeproxy.get_proxy_list() # free-proxy < 1.1.0 all_proxies.reverse() # Try the older proxies first t1 = time.time() while (time.time()-t1 < wait_time): proxy = all_proxies.pop() if not all_proxies: all_proxies = freeproxy.get_proxy_list() if proxy in self._dirty_freeproxies: continue proxies = {'http://': proxy, 'https://': proxy} proxy_works = self._check_proxy(proxies) if proxy_works: dirty_proxy = (yield proxy) t1 = time.time() else: dirty_proxy = proxy self._dirty_freeproxies.add(dirty_proxy)
[docs] def FreeProxies(self, timeout=1, wait_time=120): """ Sets up continuously rotating proxies from the free-proxy library :param timeout: Timeout for a single proxy in seconds, optional :type timeout: float :param wait_time: Maximum time (in seconds) to wait until newer set of proxies become available at https://sslproxies.org/ :type wait_time: float :returns: whether or not the proxy was set up successfully :rtype: {bool} :Example:: >>> pg = ProxyGenerator() >>> success = pg.FreeProxies() """ self.proxy_mode = ProxyMode.FREE_PROXIES # FreeProxies is the only mode that is assigned regardless of setup successfully or not. self._fp_gen = self._fp_coroutine(timeout=timeout, wait_time=wait_time) self._proxy_gen = self._fp_gen.send proxy = self._proxy_gen(None) # prime the generator self.logger.debug("Trying with proxy %s", proxy) proxy_works = self._use_proxy(proxy) n_retries = 200 n_tries = 0 while (not proxy_works) and (n_tries < n_retries): self.logger.debug("Trying with proxy %s", proxy) proxy_works = self._use_proxy(proxy) n_tries += 1 if not proxy_works: proxy = self._proxy_gen(proxy) if n_tries == n_retries: n_dirty = len(self._dirty_freeproxies) self._fp_gen.close() msg = ("None of the free proxies are working at the moment. " f"Marked {n_dirty} proxies dirty. Try again after a few minutes." ) raise MaxTriesExceededException(msg) else: return True
[docs] def ScraperAPI(self, API_KEY, country_code=None, premium=False, render=False): """ Sets up a proxy using ScraperAPI The optional parameters are only for Business and Enterprise plans with ScraperAPI. For more details, https://www.scraperapi.com/documentation/ :Example:: >>> pg = ProxyGenerator() >>> success = pg.ScraperAPI(API_KEY) :param API_KEY: ScraperAPI API Key value. :type API_KEY: string :type country_code: string, optional by default None :type premium: bool, optional by default False :type render: bool, optional by default False :returns: whether or not the proxy was set up successfully :rtype: {bool} """ if API_KEY is None: raise ValueError("ScraperAPI API Key is required.") # Get basic account information. This will NOT be counted towards successful API requests. r = requests.get("http://api.scraperapi.com/account", params={'api_key': API_KEY}).json() if "error" in r: self.logger.warning(r["error"]) return False self._API_KEY = API_KEY self.proxy_mode = ProxyMode.SCRAPERAPI r["requestLimit"] = int(r["requestLimit"]) self.logger.info("Successful ScraperAPI requests %d / %d", r["requestCount"], r["requestLimit"]) # ScraperAPI documentation recommends setting the timeout to 60 seconds # so it has had a chance to try out all the retries. # https://www.scraperapi.com/documentation/ self._TIMEOUT = 60 prefix = "http://scraperapi.retry_404=true" if country_code is not None: prefix += ".country_code=" + country_code if premium: prefix += ".premium=true" if render: prefix += ".render=true" # Suppress the unavoidable insecure request warnings with ScraperAPI urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) for _ in range(3): proxy_works = self._use_proxy(http=f'{prefix}:{API_KEY}@proxy-server.scraperapi.com:8001') if proxy_works: proxies = {'http://': f"{prefix}:{API_KEY}@proxy-server.scraperapi.com:8001",} self.logger.info("ScraperAPI proxy setup successfully") self._new_session(verify=False, proxies=proxies) return proxy_works if (r["requestCount"] >= r["requestLimit"]): self.logger.warning("ScraperAPI account limit reached.") else: self.logger.warning("ScraperAPI does not seem to work. Reason unknown.") return False
[docs] def has_proxy(self) -> bool: return self._proxy_gen or self._can_refresh_tor
def _set_proxy_generator(self, gen: Callable[..., str]) -> bool: self._proxy_gen = gen return True
[docs] def get_next_proxy(self, num_tries = None, old_timeout = 3, old_proxy=None): new_timeout = old_timeout if self._can_refresh_tor: # Check if Tor is running and refresh it self.logger.info("Refreshing Tor ID...") self._refresh_tor_id(self._tor_control_port, self._tor_password) time.sleep(5) # wait for the refresh to happen new_timeout = self._TIMEOUT # Reset timeout to default elif self._proxy_gen: if (num_tries): self.logger.info("Try #%d failed. Switching proxy.", num_tries) # Try to get another proxy new_proxy = self._proxy_gen(old_proxy) while (not self._use_proxy(new_proxy)): new_proxy = self._proxy_gen(new_proxy) new_timeout = self._TIMEOUT # Reset timeout to default self._new_session() else: self._new_session() return self._session, new_timeout
# A context manager to suppress the misleading traceback from UserAgent() # Based on https://thesmithfam.org/blog/2012/10/25/temporarily-suppress-console-output-in-python/ @staticmethod @contextmanager def _suppress_logger(loggerName: str, level=logging.CRITICAL): """Temporarily suppress logging output from a specific logger. """ logger = logging.getLogger(loggerName) original_level = logger.getEffectiveLevel() logger.setLevel(level) try: yield finally: logger.setLevel(original_level)