[docs]classBigdata:""" Represents a connection to RavenPack's Bigdata API. :ivar knowledge_graph: Proxy for the knowledge graph search functionality. :ivar search: Proxy object for the content search functionality. :ivar watchlists: Proxy object for the watchlist functionality. :ivar uploads: Proxy object for the internal content functionality. :ivar subscription: Proxy object for the subscription functionality. """
[docs]def__init__(self,username:Optional[str]=None,password:Optional[str]=None,*,bigdata_api_url:Optional[str]=None,bigdata_ws_url:Optional[str]=None,upload_api_url:Optional[str]=None,proxy:Optional[Proxy]=None,verify_ssl:Union[bool,str]=True):ifpasswordisNone:password=os.environ.get("BIGDATA_PASSWORD")ifusernameisNone:username=os.environ.get("BIGDATA_USERNAME")oros.environ.get("BIGDATA_USER")ifos.environ.get("BIGDATA_USER"):warnings.warn("BIGDATA_USER is deprecated, use BIGDATA_USERNAME instead",DeprecationWarning,stacklevel=2,)ifusernameisNoneorpasswordisNone:raiseValueError("Username and password must be provided")auth=Auth.from_username_and_password(username,password,clerk_frontend_url=str(settings.CLERK_FRONTEND_URL),clerk_instance_type=settings.CLERK_INSTANCE_TYPE,pool_maxsize=settings.MAX_PARALLEL_REQUESTS,proxy=proxy,verify=verify_ssl,)organization_id=get_token_claim(token=auth._token_manager.get_session_token(),claim=JWT_CLAIM_ORGANIZATION_ID,)ifbigdata_api_urlisNone:bigdata_api_url=str(settings.BACKEND_API_URL)ifbigdata_ws_urlisNone:bigdata_ws_url=str(settings.BACKEND_WS_API_URL)ifupload_api_urlisNone:upload_api_url=str(settings.UPLOAD_API_URL)self._api=BigdataConnection(auth,bigdata_api_url,bigdata_ws_url)self._upload_api=UploadsConnection(auth,upload_api_url,organization_id=organization_id)# Start the different servicesself.knowledge_graph=KnowledgeGraph(self._api)self.search=ContentSearch(self._api)self.watchlists=Watchlists(self._api)self.uploads=Uploads(uploads_api=self._upload_api)self.subscription=Subscription(api_connection=self._api,uploads_api_connection=self._upload_api)self.chat=ChatService(api_connection=self._api)