Skip to content

Commit

Permalink
distributed performance metric collection framework (#589)
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Roushan authored Jun 22, 2021
1 parent ce9c853 commit d4a4a25
Show file tree
Hide file tree
Showing 19 changed files with 526 additions and 85 deletions.
4 changes: 4 additions & 0 deletions delfin/cmd/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ def main():

task_server = service.TaskService.create(binary='delfin-task',
coordination=True)
leader_election = service.LeaderElectionService.create()

service.serve(task_server)
service.serve(leader_election)

service.wait()


Expand Down
81 changes: 80 additions & 1 deletion delfin/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@
help='The backend server for distributed coordination.'),
cfg.IntOpt('expiration',
default=100,
help='The expiration(in second) of the lock.')
help='The expiration(in second) of the lock.'),
cfg.IntOpt('lease_timeout',
default=15,
help='The expiration(in second) of the lock.'),
]

CONF = cfg.CONF
Expand Down Expand Up @@ -77,6 +80,10 @@ def start(self):
# NOTE(gouthamr): Tooz expects member_id as a byte string.
member_id = (self.prefix + self.agent_id).encode('ascii')

LOG.info('Started Coordinator (Agent ID: %(agent)s, prefix: '
'%(prefix)s)', {'agent': self.agent_id,
'prefix': self.prefix})

backend_url = _get_redis_backend_url()
self.coordinator = coordination.get_coordinator(
backend_url, member_id,
Expand Down Expand Up @@ -112,6 +119,78 @@ def get_lock(self, name):
LOCK_COORDINATOR = Coordinator(prefix='delfin-')


class LeaderElectionCoordinator(Coordinator):

def __init__(self, agent_id=None):
super(LeaderElectionCoordinator, self). \
__init__(agent_id=agent_id, prefix="leader_election")
self.group = None

def start(self):
"""Connect to coordination back end."""
if self.started:
return

# NOTE(gouthamr): Tooz expects member_id as a byte string.
member_id = (self.prefix + "-" + self.agent_id).encode('ascii')
LOG.info('Started Coordinator (Agent ID: %(agent)s, '
'prefix: %(prefix)s)', {'agent': self.agent_id,
'prefix': self.prefix})

backend_url = _get_redis_backend_url()
self.coordinator = coordination.get_coordinator(
backend_url, member_id,
timeout=CONF.coordination.lease_timeout)
self.coordinator.start()
self.started = True

def ensure_group(self, group):
req = self.coordinator.get_groups()
groups = req.get()
try:
# Check if group exist
groups.index(group)
except Exception:
# Create a group if not exist
LOG.debug("Exception is expected as requested group not available "
"in tooz backend. Creating the group")
request = self.coordinator.create_group(group)
request.get()
else:
LOG.info("Leader group already exist")

self.group = group

def join_group(self):
if self.group:
request = self.coordinator.join_group(self.group)
request.get()

def register_on_start_leading_callback(self, callback):
return self.coordinator.watch_elected_as_leader(self.group, callback)

def send_heartbeat(self):
return self.coordinator.heartbeat()

def start_leader_watch(self):
return self.coordinator.run_watchers()

def stop(self):
"""Disconnect from coordination back end."""
if self.started:
self.coordinator.stop()
self.coordinator = None
self.started = False

LOG.info('Stopped Coordinator (Agent ID: %(agent)s',
{'agent': self.agent_id})

def is_still_leader(self):
for acquired_lock in self.coordinator._acquired_locks:
return acquired_lock.is_still_owner()
return False


class Lock(locking.Lock):
"""Lock with dynamic name.
Expand Down
7 changes: 5 additions & 2 deletions delfin/drivers/fake_storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
import copy
import random
import decorator

import math
import six
import time

import six
from eventlet import greenthread
from oslo_config import cfg
from oslo_log import log
from oslo_utils import uuidutils
Expand Down Expand Up @@ -89,7 +92,7 @@ def wait_random(low, high):
def _wait(f, *a, **k):
rd = random.randint(0, 100)
secs = low + (high - low) * rd / 100
time.sleep(secs)
greenthread.sleep(secs)
return f(*a, **k)

return _wait
Expand Down
Empty file.
46 changes: 46 additions & 0 deletions delfin/leader_election/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from delfin.leader_election.tooz.callback import ToozLeaderElectionCallback
from delfin.leader_election.tooz.leader_elector import Elector
from delfin.task_manager.scheduler.schedule_manager import SchedulerManager

LEADER_ELECTION_KEY = "delfin-performance-metric-collection"


class LeaderElectionFactory:

@staticmethod
def construct_elector(plugin, leader_key=None):
"""
Construct leader election elector based on specified plugin
:param string plugin: required plugin for leader election
"""
# Maintain a unique key for metric collection leader election
leader_election_key = LEADER_ELECTION_KEY
if leader_key:
leader_election_key = leader_key

scheduler_mgr = SchedulerManager()

if plugin == "tooz":
# Create callback object
callback = ToozLeaderElectionCallback.register(
on_leading_callback=scheduler_mgr.start,
on_stop_callback=scheduler_mgr.stop)

return Elector(callback, leader_election_key)
else:
raise ValueError(plugin)
68 changes: 68 additions & 0 deletions delfin/leader_election/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Leader election interface defined"""

import six
import abc


@six.add_metaclass(abc.ABCMeta)
class LeaderCallback:

def __init__(self):
self.on_started_leading_callback = None
"""on_started_leading is called when elected as leader"""

self.on_stopped_leading_callback = None
"""on_stopped_leading is called when Leader give up its leadership"""

@abc.abstractmethod
def on_started_leading(self, *args, **kwargs):
pass

@abc.abstractmethod
def on_stopped_leading(self, *args, **kwargs):
pass

@classmethod
def register(cls, on_leading_callback, on_stop_callback):
callback = cls()
callback.on_started_leading_callback = on_leading_callback
callback.on_stopped_leading_callback = on_stop_callback
return callback


@six.add_metaclass(abc.ABCMeta)
class LeaderElector:

def __init__(self, callbacks, election_key):
self.callbacks = callbacks
self.election_key = election_key

@abc.abstractmethod
def run(self):
"""kick start leader election.
Invoke callback.on_started_leading callback once elected as leader
Invoke callback.on_stopped_leading callback once lose leadership
run returns once leader losses its leadership
"""
pass

@abc.abstractmethod
def cleanup(self):
"""Cleanup leader election residue
"""
pass
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import six
from apscheduler.schedulers.background import BackgroundScheduler
from delfin.leader_election.interface import LeaderCallback

from delfin import utils

class ToozLeaderElectionCallback(LeaderCallback):

@six.add_metaclass(utils.Singleton)
class Scheduler:
@staticmethod
def get_instance():
return Scheduler().background_scheduler
def on_started_leading(self, *args, **kwargs):
return self.on_started_leading_callback()

def __init__(self):
self.background_scheduler = BackgroundScheduler()
def on_stopped_leading(self, *args, **kwargs):
return self.on_stopped_leading_callback()
107 changes: 107 additions & 0 deletions delfin/leader_election/tooz/leader_elector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Leader elector is leased based leader election"""

import threading

from oslo_log import log
from oslo_utils import timeutils

from delfin.coordination import LeaderElectionCoordinator
from delfin.leader_election.interface import LeaderElector

LOG = log.getLogger(__name__)


class Elector(LeaderElector):

def __init__(self, callbacks, leader_election_key):
key = leader_election_key.encode('ascii')
super(Elector, self).__init__(callbacks, key)

self._coordinator = None
self.leader = False
self._stop = threading.Event()
self._runner = None

def run(self):
if self._coordinator:
return

self._stop.clear()

self._coordinator = LeaderElectionCoordinator()
self._coordinator.start()

self._coordinator.ensure_group(self.election_key)
self._coordinator.join_group()

self._coordinator. \
register_on_start_leading_callback(self.
callbacks.on_started_leading)

# Register internal callback to notify being a leader
self._coordinator. \
register_on_start_leading_callback(self.set_leader_callback)

while not self._stop.is_set():
with timeutils.StopWatch() as w:
LOG.debug("sending heartbeats for leader election")
wait_until_next_beat = self._coordinator.send_heartbeat()

ran_for = w.elapsed()
has_to_sleep_for = wait_until_next_beat - ran_for
if has_to_sleep_for < 0:
LOG.warning(
"Heart beating took too long to execute (it ran for"
" %0.2f seconds which is %0.2f seconds longer than"
" the next heartbeat idle time). This may cause"
" timeouts (in locks, leadership, ...) to"
" happen (which will not end well).", ran_for,
ran_for - wait_until_next_beat)

# Check if coordinator is still a leader
if self.leader and not self._coordinator.is_still_leader():
self.on_stopped_leading()
self.leader = False
return
self._coordinator.start_leader_watch()

if self.leader:
# Adjust time for leader
has_to_sleep_for = has_to_sleep_for / 2

LOG.debug('resting after leader watch as leader=%(leader)s '
'for heartbeat timeout of %(timeout)s sec',
{'timeout': has_to_sleep_for, 'leader': self.leader})

self._stop.wait(has_to_sleep_for)

def set_leader_callback(self, *args, **kwargs):
self.leader = True

def cleanup(self):
self._stop.set()

if self._coordinator:
self._coordinator.stop()
self._coordinator = None

if self.leader:
self.on_stopped_leading()
self.leader = False

def on_stopped_leading(self):
self.callbacks.on_stopped_leading()
Loading

0 comments on commit d4a4a25

Please sign in to comment.