Skip to content
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

### Added
- Add support for `EXTRACTOR_KEY` and `CLOWDER_EMAIL` environment variables to register
an extractor for just one user.

## 2.6.0 - 2022-06-14

This will change how clowder sees the extractors. If you have an extractor, and you specify
Expand Down
45 changes: 29 additions & 16 deletions pyclowder/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Connector(object):
registered_clowder = list()

def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
mounted_paths=None, clowder_url=None, max_retry=10):
mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
self.extractor_name = extractor_name
self.extractor_info = extractor_info
self.check_message = check_message
Expand All @@ -77,6 +77,8 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m
else:
self.mounted_paths = mounted_paths
self.clowder_url = clowder_url
self.clowder_email = clowder_email
self.extractor_key = extractor_key
self.max_retry = max_retry

filename = 'notifications.json'
Expand Down Expand Up @@ -399,8 +401,8 @@ def _process_message(self, body):
# register extractor
url = "%sapi/extractors" % source_host
if url not in Connector.registered_clowder:
Connector.registered_clowder.append(url)
self.register_extractor("%s?key=%s" % (url, secret_key))
Connector.registered_clowder.append(url)

# tell everybody we are starting to process the file
self.status_update(pyclowder.utils.StatusMessage.start, resource, "Started processing.")
Expand Down Expand Up @@ -518,18 +520,24 @@ def register_extractor(self, endpoints):

headers = {'Content-Type': 'application/json'}
data = self.extractor_info
if self.extractor_key:
data["unique_key"] = self.extractor_key

for url in endpoints.split(','):
if url not in Connector.registered_clowder:
Connector.registered_clowder.append(url)
try:
result = requests.post(url.strip(), headers=headers,
data=json.dumps(data),
verify=self.ssl_verify)
result.raise_for_status()
logger.debug("Registering extractor with %s : %s", url, result.text)
except Exception as exc: # pylint: disable=broad-except
logger.exception('Error in registering extractor: ' + str(exc))
if "unique_key" in data:
if url.find("?") > -1:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this is the first parameter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the config has specified a registration URL without an API key, registering privately with just an email address won't be accepted. so it doesn't send the request.

url += "&user=%s" % self.clowder_email
else:
logger.info("Unable to register extractor without an API key.")
return
try:
result = requests.post(url.strip(), headers=headers,
data=json.dumps(data),
verify=self.ssl_verify)
result.raise_for_status()
logger.info("Registering extractor as %s : %s", url, result.text)
except Exception as exc: # pylint: disable=broad-except
logger.exception('Error in registering extractor: ' + str(exc))

# pylint: disable=no-self-use
def status_update(self, status, resource, message):
Expand Down Expand Up @@ -636,16 +644,19 @@ class RabbitMQConnector(Connector):
def __init__(self, extractor_name, extractor_info,
rabbitmq_uri, rabbitmq_exchange=None, rabbitmq_key=None, rabbitmq_queue=None,
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None,
heartbeat=5*60, clowder_url=None, max_retry=10):
heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
ssl_verify, mounted_paths, clowder_url, max_retry)
ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email)
self.rabbitmq_uri = rabbitmq_uri
self.rabbitmq_exchange = rabbitmq_exchange
self.rabbitmq_key = rabbitmq_key
if rabbitmq_queue is None:
self.rabbitmq_queue = extractor_info['name']
else:
self.rabbitmq_queue = rabbitmq_queue
self.extractor_key = extractor_key
if extractor_key:
self.rabbitmq_queue = "private.%s.%s" % (extractor_key, self.rabbitmq_queue)
self.channel = None
self.connection = None
self.consumer_tag = None
Expand Down Expand Up @@ -693,7 +704,7 @@ def connect(self):
routing_key="extractors." + self.extractor_name)

# start the extractor announcer
self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.rabbitmq_queue, self.heartbeat)
self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.clowder_email, self.rabbitmq_queue, self.heartbeat)
self.announcer.start_thread()

def listen(self):
Expand Down Expand Up @@ -797,10 +808,11 @@ def on_message(self, channel, method, header, body):


class RabbitMQBroadcast:
def __init__(self, rabbitmq_uri, extractor_info, rabbitmq_queue, heartbeat):
def __init__(self, rabbitmq_uri, extractor_info, clowder_email, rabbitmq_queue, heartbeat):
self.active = True
self.rabbitmq_uri = rabbitmq_uri
self.extractor_info = extractor_info
self.clowder_email = clowder_email
self.rabbitmq_queue = rabbitmq_queue
self.heartbeat = heartbeat
self.id = str(uuid.uuid4())
Expand Down Expand Up @@ -830,6 +842,7 @@ def send_heartbeat(self):
message = {
'id': self.id,
'queue': self.rabbitmq_queue,
'owner': self.clowder_email,
'extractor_info': self.extractor_info
}
next_heartbeat = time.time()
Expand Down
18 changes: 17 additions & 1 deletion pyclowder/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def __init__(self):
rabbitmq_exchange = os.getenv('RABBITMQ_EXCHANGE', "")
clowder_url = os.getenv("CLOWDER_URL", "")
registration_endpoints = os.getenv('REGISTRATION_ENDPOINTS', "")
extractor_key = os.getenv("EXTRACTOR_KEY", "")
clowder_email = os.getenv("CLOWDER_EMAIL", "")
logging_config = os.getenv("LOGGING")
mounted_paths = os.getenv("MOUNTED_PATHS", "{}")
input_file_path = os.getenv("INPUT_FILE_PATH")
Expand All @@ -91,6 +93,12 @@ def __init__(self):
self.parser.add_argument('--register', '-r', nargs='?', dest="registration_endpoints",
default=registration_endpoints,
help='Clowder registration URL (default=%s)' % registration_endpoints)
self.parser.add_argument('--key', '-k', dest="extractor_key",
default=extractor_key,
help='Unique key to use for extractor queue ID (sets extractor to private)')
self.parser.add_argument('--user', '-u', dest="clowder_email",
default=clowder_email,
help='Email address of Clowder user who will initially be assigned ownership (ignored if no --key provided)')
self.parser.add_argument('--rabbitmqURI', nargs='?', dest='rabbitmq_uri', default=rabbitmq_uri,
help='rabbitMQ URI (default=%s)' % rabbitmq_uri.replace("%", "%%"))
self.parser.add_argument('--rabbitmqQUEUE', nargs='?', dest='rabbitmq_queuename',
Expand Down Expand Up @@ -173,9 +181,17 @@ def start(self):
mounted_paths=json.loads(self.args.mounted_paths),
clowder_url=self.args.clowder_url,
max_retry=self.args.max_retry,
heartbeat=self.args.heartbeat)
heartbeat=self.args.heartbeat,
extractor_key=self.args.extractor_key,
clowder_email=self.args.clowder_email)
connector.connect()
connector.register_extractor(self.args.registration_endpoints)

url = "%sapi/extractors" % self.args.clowder_url
if url not in connector.registered_clowder:
connector.register_extractor("%s" % (url))
connector.registered_clowder.append(url)

threading.Thread(target=connector.listen, name="RabbitMQConnector").start()

elif self.args.connector == "HPC":
Expand Down