Source code for sci_watch.sci_watcher

from __future__ import annotations

import os
from datetime import datetime, timedelta
from pathlib import Path

import pytz
import toml

from sci_watch.parser.query import Query
from sci_watch.senders.gmail_sender import send_email
from sci_watch.senders.local_dir_sender import write_on_file
from sci_watch.senders.slack_sender import send_slack
from sci_watch.senders.teams_sender import send_teams
from sci_watch.source_wrappers.abstract_wrapper import SourceWrapper
from sci_watch.source_wrappers.arxiv_wrapper import ArxivWrapper
from sci_watch.source_wrappers.openai_wrapper import OpenAIBlogWrapper
from sci_watch.source_wrappers.techcrunch_wrapper import TechCrunchWrapper
from sci_watch.summarizers import AbstractSummarizer, get_summarizer
from sci_watch.utils.logger import get_logger, logging_wrapper
from sci_watch.watcher.watcher import Watcher

LOGGER = get_logger(__name__)


[docs]class SciWatcher: def __init__(self, config: dict) -> None: """ Parameters ---------- config: dict Configuration object containing the title, end_date, time_delta, recipients, sources, and queries """ self.title = config["title"] self.end_date = pytz.UTC.localize(self._get_end_date(config["end_date"])) self.start_date = self.end_date - self._get_time_delta(config["time_delta"]) # TODO: refactor fast. FAST! self.email_config = config.get("email") if self.email_config: LOGGER.info("Email config: %s", self.email_config) self.teams_config = config.get("teams") if self.teams_config: LOGGER.info("Teams config: %s", self.teams_config) self.slack_config = config.get("slack") if self.slack_config: LOGGER.info("Slack config: %s", self.slack_config) self.local_dir_config = config.get("local_dir") if self.local_dir_config: LOGGER.info("Local dir config: %s", self.local_dir_config) self.summarization_config = config.get("summarize") self.summarizer = None if self.summarization_config: LOGGER.info( "Creating summarizer with config: %s", self.summarization_config ) self.summarizer = self._get_summarizer(self.summarization_config) assert ( self.email_config is not None or self.teams_config is not None ), "Email and/or Teams config should be set." sources = self._load_sources(config["source"]) LOGGER.info("Got %i sources", len(sources)) queries = self._load_queries(config["query"]) LOGGER.info("Got %i queries", len(queries)) self.watchers = [Watcher(query=query, sources=sources) for query in queries] LOGGER.info("Got %i watchers", len(self.watchers))
[docs] @classmethod def from_toml(cls, path: os.PathLike | Path) -> SciWatcher: """ Create a Scrapper from toml file Parameters ---------- path: os.PathLike Path to a .toml file Returns ------- Scrapper: A scrapper object """ config = toml.load(path) return cls(config=config)
@staticmethod @logging_wrapper(LOGGER) def _get_summarizer(summarizer_config: dict[str, ...]) -> AbstractSummarizer: summarizer_type = summarizer_config["type"] return get_summarizer(type=summarizer_type, summarizer_kwargs=summarizer_config) @staticmethod def _get_time_delta(time_delta: str) -> timedelta: """ Parse `time_delta` string into `time.timedelta` object Parameters ---------- time_delta: str Time delta string to parse Returns ------- timedelta: Parsed `time.timedelta` object """ time_delta = datetime.strptime(time_delta, "%d:%H:%M:%S") return timedelta( days=time_delta.day, hours=time_delta.hour, minutes=time_delta.minute, seconds=time_delta.second, ) @staticmethod def _get_end_date(end_date: str) -> datetime: """ Parse an `end_date` string into a `time.datetime` object Parameters ---------- end_date: str String representation of end date Returns ------- datetime: End date as `time.datetime` object """ if end_date == "now": return datetime.now() else: raise NotImplementedError( 'Currently only "now" is supported for "end_date" field' ) @logging_wrapper(LOGGER) def _load_sources(self, sources: list[dict]) -> list[SourceWrapper]: """ Load sources from their string representation Parameters ---------- sources: list[dict] List of sources with their parameters as a dict/string representation Returns ------- list[SourceWrapper]: List of ready to use `SourceWrapper` subclasses objects (sources) """ source_objects = [] for source_params in sources: source_type = source_params["type"] if source_type == "arxiv": source_objects.append( ArxivWrapper( start_date=self.start_date, end_date=self.end_date, search_topic=source_params["search_topic"], max_documents=source_params["max_documents"], ) ) elif source_type == "techcrunch": source_objects.append( TechCrunchWrapper( start_date=self.start_date, end_date=self.end_date, search_topic=source_params["search_topic"], max_documents=source_params["max_documents"], ) ) elif source_type == "openai_blog": source_objects.append( OpenAIBlogWrapper( start_date=self.start_date, end_date=self.end_date, max_documents=source_params["max_documents"], ) ) else: raise ValueError(f"No source named {source_type}.") return source_objects @staticmethod @logging_wrapper(LOGGER) def _load_queries(queries: list[dict]) -> list[Query]: """ Load queries from their string representation Parameters ---------- queries: list[dict] List of queries with their parameters as a dict/string representation Returns ------- list[Query]: List of ready-to-use query objects """ query_objects = [] for query_params in queries: current_query = Query( title=query_params["title"], raw_content=query_params["raw_content"] ) query_objects.append(current_query) return query_objects
[docs] @logging_wrapper(LOGGER) def exec(self) -> None: """ Run the queries on all the sources, sends an email and/or teams message if relevant documents are found """ docs = [] for watcher in self.watchers: docs.extend(watcher.exec()) if len(docs) > 0: LOGGER.info("Got %i documents in total", len(docs)) summaries = ( self.summarizer.batch_summarize(docs=docs) if self.summarizer else None ) if self.local_dir_config: LOGGER.info("Writing locally at %s", self.local_dir_config["path"]) write_on_file( docs=docs, summaries=summaries, output_dir=Path(self.local_dir_config["path"]), ) if self.slack_config: LOGGER.info("Sending slack message") send_slack( channel_id=self.slack_config["channel_id"], documents=docs, summaries=summaries, ) if self.email_config: LOGGER.info("Sending email") send_email( subject=f'{self.title} - {datetime.now().strftime("%Y-%m-%d")}', recipients=self.email_config["recipients"], docs=docs, summaries=summaries, ) if self.teams_config: LOGGER.info("Sending teams message") send_teams( webhook_url=self.teams_config["webhook_url"], docs=docs, summaries=summaries, ) else: LOGGER.warning("Got 0 relevant documents using the config %s", self.title)