Threading - Find Entities¶
When there is need to resolve many entities, multithreading can be used to speed things up. Below is an example using the autosuggest method and the implementation would be similar with any of the other methods from the knowledge_graph family.
Note
The instance of Bigdata includes a built-in rate limiter to avoid making too many requests in a short period of time. More information can be foud at Multithreading with the Bigdata SDK.
Example¶
Here the goal is to use threads and not having to wait for the response of KnowledgeGraph.autosuggest
before making the next call.
We create a function autosuggest_handling_errors
that will be used by the threads.
Step 1: Create a function that invokes the method and wraps any possible error.
from bigdata_client import Bigdata
bigdata = Bigdata()
def autosuggest_handling_errors(bigdata: Bigdata, value: str):
try:
return bigdata.knowledge_graph.autosuggest(value)
except Exception as e:
return f"Error: {e}"
What to do when an error appears is up to the implementation, in this case, the error message is returned. You may prefer to log it, store it in a file or raise it and stop the execution.
Step 2: Set up the threads and use the function created in step 1.
def concurrent_autosuggest(
bigdata: Bigdata, values: list[str], max_concurrency: int
):
results = {}
with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
futures_to_values = {
executor.submit(autosuggest_handling_errors, bigdata, value): value
for value in values
}
for future in concurrent.futures.as_completed(futures_to_values):
value_resolved = futures_to_values[future]
results[value_resolved] = future.result()
return results
The function above creates a thread pool and will take turns calling autosuggest_handling_errors
.
The response is returned as a dictionary.
Warning
The instance of Bigdata reuses the connection so in case you plan on creating many threads, increase the number of simultaneous connections allowed by configuring BIGDATA_MAX_PARALLEL_REQUESTS.
Step 3: Execution
import pprint
if __name__ == "__main__":
values = ["tesla", "apple"]
max_concurrency = 2
results = concurrent_autosuggest(bigdata, values, max_concurrency)
pprint.PrettyPrinter().pprint(results)
Output:
{'apple': [Company(id='D8442A', name='Apple Inc.', volume=None, description=None, entity_type='COMP', company_type='Public', country='United States', sector='Technology', industry_group='Computer Hardware', industry='Computer Hardware', ticker='AAPL'),
Concept(id='4AD3C9', name='Apple', volume=None, description=None, entity_type='FRTS', entity_type_name='Fruits', concept_level_2=None, concept_level_3=None, concept_level_4=None, concept_level_5=None)],
'tesla': [Company(id='DD3BB1', name='Tesla Inc.', volume=None, description=None, entity_type='COMP', company_type='Public', country='United States', sector='Consumer Goods', industry_group='Automobiles', industry='Automobiles', ticker='TSLA'),
Product(id='C7BCE3', name='Tesla Automobile', volume=None, description=None, entity_type='PROD', product_type='Electric Vehicle', product_owner='Tesla Inc.')]}
Full code¶
from bigdata_client import Bigdata
import concurrent.futures
from concurrent.futures.thread import ThreadPoolExecutor
import pprint
bigdata = Bigdata()
def autosuggest_handling_errors(bigdata: Bigdata, value: str):
try:
return bigdata.knowledge_graph.autosuggest(value)
except Exception as e:
return f"Error: {e}"
def concurrent_autosuggest(
bigdata: Bigdata, values: list[str], max_concurrency: int
):
results = {}
with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
futures_to_values = {
executor.submit(autosuggest_handling_errors, bigdata, value): value
for value in values
}
for future in concurrent.futures.as_completed(futures_to_values):
value_resolved = futures_to_values[future]
results[value_resolved] = future.result()
return results
values = ["tesla", "apple"]
max_concurrency = 2
results = concurrent_autosuggest(bigdata, values, max_concurrency)
pprint.PrettyPrinter().pprint(results)