import elasticapm
import sergeant
import logging
import requests
class Worker(
sergeant.worker.Worker,
):
def generate_config(
self,
):
return sergeant.config.WorkerConfig(
name='test_worker',
connector=sergeant.config.Connector(
type='redis',
params={
'nodes': [
{
'host': 'localhost',
'port': 6379,
'password': None,
'database': 0,
},
],
},
),
max_tasks_per_run=100,
tasks_per_transaction=1,
max_retries=3,
logging=sergeant.config.Logging(
level=logging.INFO,
log_to_stdout=True,
),
)
def initialize(
self,
):
self.apm_client = elasticapm.Client(
server_url='http://localhost:8200/',
environment='development',
service_name=self.config.name,
service_version='1.0',
auto_log_stacks=True,
collect_local_variables='errors',
instrument=True,
metrics_interval='30s',
)
def finalize(
self,
):
self.apm_client.close()
def pre_work(
self,
task,
):
self.apm_client.begin_transaction(
transaction_type='work',
trace_parent=task.trace_id,
)
def post_work(
self,
task,
success,
exception,
):
if exception is not None:
self.apm_client.capture_exception()
self.apm_client.end_transaction(
name='work',
result='success' if success else 'failure',
)
def work(
self,
task,
):
url_to_crawl = task.kwargs['url']
response = requests.get(
url=url_to_crawl,
)
response.raise_for_status()