[docs]classFile(BaseModel):""" Representation of a file. """_uploads_api:UploadsConnectionProtocolid:strname:Optional[str]=Nonestatus:Optional[FileStatus]=Noneuploaded_at:Optional[datetime]=Noneraw_size:Optional[int]=Nonefolder_id:Optional[str]=Nonetags:list[str]=field(default_factory=lambda:[])company_shared_permission:Optional[SharePermission]=None
[docs]defdelete(self):""" Deletes the file from the server. The file must be fully processed before deleting. """try:self._uploads_api.delete_file(self.id)exceptrequests.HTTPErrorase:ife.response.status_code==HTTPStatus.TOO_EARLY:raiseBigdataClientIncompatibleStateError("File is being processed and cannot be deleted yet.")raise
[docs]defreload_status(self):"""Updates the status of the file."""status_response=self._uploads_api.get_file_status(self.id)ifstatus_response.error:raiseValueError(status_response.error)self.status=status_response.status
[docs]defwait_for_completion(self,timeout:Optional[int]=None):"""Waits for the file to be completed."""completed_status=[FileStatus.COMPLETED,FileStatus.DELETED,FileStatus.FAILED]time_elapsed=0delta=1whileself.statusnotincompleted_status:time.sleep(delta)time_elapsed+=deltaself.reload_status()iftimeoutisnotNoneandtime_elapsed>=timeout:raiseTimeoutError("Timeout waiting for file to be processed")
[docs]defdownload_original(self,filename:str):"""Downloads the original content of the file."""# GET /file/<id> returns the URL to S3, not the content# Other types of files can be downloaded directlyresponse=self._uploads_api.get_download_presigned_url(self.id)content=get_chunks_from_presigned_url(response.url)withopen(filename,"wb")asf:forchunkincontent:f.write(chunk)
[docs]defdownload_analytics(self,filename:str):"""Downloads the analytics in the file."""content=self._uploads_api.download_analytics(self.id)withopen(filename,"wb")asf:forchunkincontent:f.write(chunk)
[docs]defget_analytics_dict(self):"""Retrieves the analytics in the file, as a dictionary."""content_chunks=self._uploads_api.download_analytics(self.id)content=b"".join(content_chunks)returnjson.loads(content)
[docs]defdownload_annotated(self,filename:str):"""Downloads the annotated version of the file."""content=self._uploads_api.download_annotated(self.id)withopen(filename,"wb")asf:forchunkincontent:f.write(chunk)
[docs]defget_annotated_dict(self):"""Retrieves the annotated version of the file, as a dictionary."""content_chunks=self._uploads_api.download_annotated(self.id)content=b"".join(content_chunks)returnjson.loads(content)
[docs]defshare_with_company(self):""" Shares a file with the whole company. """try:response=self._uploads_api.share_file_with_company(file_id=self.id)exceptrequests.HTTPErrorase:ife.response.status_code==HTTPStatus.TOO_EARLY:raiseBigdataClientIncompatibleStateError("File is being processed and cannot be shared yet.")raiseself.company_shared_permission=SharePermission.READreturnresponse.model_dump()
[docs]defunshare_with_company(self):""" Stops sharing a file with the whole company. """try:response=self._uploads_api.unshare_file_with_company(file_id=self.id)exceptrequests.HTTPErrorase:ife.response.status_code==HTTPStatus.TOO_EARLY:raiseBigdataClientIncompatibleStateError("File is being processed and cannot be unshared yet.")raiseself.company_shared_permission=Nonereturnresponse.model_dump()
[docs]defadd_tags(self,value:list[str])->dict:"""Add tags to a file. Args: value (list[str]): Tags to be added. Returns: dict: File information. Raises: ValueError: If 'value' parameter is not a list. ValueError: If 'value' parameter is empty. """self._validate_tags_value(value)file_response=self._uploads_api.get_file(id=self.id)updated_tags=set(file_response.tagsor[])updated_tags.update(value)try:response=self._uploads_api.update_file_tags(file_id=self.id,tags=sorted(updated_tags))exceptrequests.HTTPErrorase:ife.response.status_code==HTTPStatus.TOO_EARLY:raiseBigdataClientIncompatibleStateError("File is being processed and its tags cannot be modified yet.")raiseself.tags=response.tagsor[]returnresponse.model_dump()
[docs]defremove_tags(self,value:list[str])->dict:"""Remove tags to a file. Args: value (list[str]): Tags to be removed. Returns: dict: File information. Raises: ValueError: If 'value' parameter is not a list. ValueError: If 'value' parameter is empty. """self._validate_tags_value(value)file_response=self._uploads_api.get_file(id=self.id)updated_tags=set(file_response.tagsor[])updated_tags.difference_update(value)try:response=self._uploads_api.update_file_tags(file_id=self.id,tags=sorted(updated_tags))exceptrequests.HTTPErrorase:ife.response.status_code==HTTPStatus.TOO_EARLY:raiseBigdataClientIncompatibleStateError("File is being processed and its tags cannot be modified yet.")raiseself.tags=response.tagsor[]returnresponse.model_dump()
[docs]defset_tags(self,value:list[str])->dict:"""Remove tags to a file. Args: value (list[str]): Tags to be removed. Returns: dict: File information. Raises: ValueError: If 'value' parameter is not a list. ValueError: If 'value' parameter is empty. """self._validate_tags_value(value)try:response=self._uploads_api.update_file_tags(file_id=self.id,tags=sorted(set(value)))exceptrequests.HTTPErrorase:ife.response.status_code==HTTPStatus.TOO_EARLY:raiseBigdataClientIncompatibleStateError("File is being processed and its tags cannot be modified yet.")raiseself.tags=response.tagsor[]returnresponse.model_dump()
def_validate_tags_value(self,value:list[str]):ifnotisinstance(value,list):raiseValueError("'value' must be a list.")ifnotvalue:raiseValueError("'value' cannot be empty.")filtered_tag_list=[tagfortaginvalueiftag]ifnotfiltered_tag_list:raiseValueError("'value' cannot be composed of empty values.")def_download_text(self,filename:str):""" Downloads the text extraction of the file. Marked as private to not cause confusion to the user """content=self._uploads_api.download_text(self.id)withopen(filename,"wb")asf:forchunkincontent:f.write(chunk)def__str__(self):"""Returns a string representation of the file with the ls -l format."""file_id=self.idor"FILE NOT UPLOADED "size=(padded(human_readable_size(self.raw_size),4)ifself.raw_sizeelse" N/A")date=(human_readable_date(self.uploaded_at)ifself.uploaded_atelse" N/A")name=self.nameifself.nameelse"N/A"returnf"{file_id}{size}{date}{name}"
[docs]defhuman_readable_size(num_bytes:int)->str:""" Returns a human readable string of the given size in bytes. It displays the size in the highest unit possible >>> human_readable_size(1) '1' >>> human_readable_size(32) '32' >>> human_readable_size(512 * 1024) '512K' >>> human_readable_size(3 * 1024 * 1024) '3M' >>> human_readable_size(1024 * 1024 * 1024) '1G' It only shows the decimal part for units between 1 and 9.9, and only if they are not 0: >>> human_readable_size(1.1 * 1024) '1.1K' >>> human_readable_size(2.1 * 1024) '2.1K' >>> human_readable_size(9.9 * 1024) '9.9K' >>> human_readable_size(10.01 * 1024) '10K' >>> human_readable_size(52.5 * 1024) '52K' >>> human_readable_size(0.9 * 1024 * 1024) '921K' Finally, it rounds the number to the nearest integer >>> human_readable_size(1000) '1K' >>> human_readable_size(1023) '1K' >>> human_readable_size(1024 * 1024 - 1) '1M' """size=float(num_bytes)forunitin["","K","M","G","T","P","E"]:# 1000 instead of 1024 to get things like 1M instead of 1001Kifsize<1000:ssize=f"{size:.1f}"ifsize<10elsef"{int(size)}"# Remove leading zerosifssize[-2:]==".0":ssize=ssize[:-2]returnf"{ssize}{unit}"size/=1024returnf"{int(size)}Z"
[docs]defhuman_readable_date(date:datetime)->str:"""Returns a human readable date of the given date."""month=date.strftime("%b")day=date.strftime("%d")ifday[0]=="0":day=f" {day[1:]}"year=date.strftime("%Y")returnf"{month}{day}{year}"
[docs]defpadded(value:str,length:int)->str:"""Returns a string with the given value padded to the right."""returnf"{value:>{length}}"