import yaml
import socket
import argparse
import datetime
import setproctitle
import gevent
import gevent.hub
import signal
import subprocess
import powerpool
import time
import logging
import sys
from gevent_helpers import BlockingDetector
from gevent import sleep
from gevent.monkey import patch_all
from gevent.server import DatagramServer
patch_all()
from .utils import import_helper
from .lib import MinuteStatManager, SecondStatManager, Component
from .jobmanagers import Jobmanager
from .reporters import Reporter
from .stratum_server import StratumServer
def main():
parser = argparse.ArgumentParser(description='Run powerpool!')
parser.add_argument('config', type=argparse.FileType('r'),
help='yaml configuration file to run with')
parser.add_argument('-d', '--dump-config', action="store_true",
help='print the result of the YAML configuration file and exit')
parser.add_argument('-s', '--server-number', type=int, default=0,
help='increase the configued server_number by this much')
args = parser.parse_args()
# override those defaults with a loaded yaml config
raw_config = yaml.load(args.config) or {}
if args.dump_config:
import pprint
pprint.pprint(raw_config)
exit(0)
PowerPool.from_raw_config(raw_config, vars(args)).start()
[docs]class PowerPool(Component, DatagramServer):
""" This is a singelton class that manages starting/stopping of the server,
along with all statistical counters rotation schedules. It takes the raw
config and distributes it to each module, as well as loading dynamic modules.
It also handles logging facilities by being the central logging registry.
Each module can "register" a logger with the main object, which attaches
it to configured handlers.
"""
manager = None
gl_methods = ['_tick_stats']
defaults = dict(procname="powerpool",
term_timeout=10,
extranonce_serv_size=4,
extranonce_size=4,
default_component_log_level='INFO',
loggers=[{'type': 'StreamHandler', 'level': 'NOTSET'}],
events=dict(enabled=False, port=8125, host="127.0.0.1"),
datagram=dict(enabled=False, port=6855, host="127.0.0.1"),
server_number=0,
algorithms=dict(
x11={"module": "drk_hash.getPoWHash",
"hashes_per_share": 4294967296}, # 2^32
scrypt={"module": "ltc_scrypt.getPoWHash",
"hashes_per_share": 65536}, # 2^16
scryptn={"module": "vtc_scrypt.getPoWHash",
"hashes_per_share": 65536},
blake256={"module": "blake_hash.getPoWHash",
"hashes_per_share": 65536},
sha256={"module": "cryptokit.sha256d",
"hashes_per_share": 4294967296},
lyra2re={"module": "lyra2re_hash.getPoWHash",
"hashes_per_share": 33554432} # 2^25
))
@classmethod
[docs] def from_raw_config(self, raw_config, args):
components = {}
types = [PowerPool, Reporter, Jobmanager, StratumServer]
component_types = {cls.__name__: [] for cls in types}
component_types['other'] = []
for key, config in raw_config.iteritems():
typ = import_helper(config['type'])
# Pass the commandline arguments to the manager component
if issubclass(typ, PowerPool):
config['args'] = args
obj = typ(config)
obj.key = key
for typ in types:
if isinstance(obj, typ):
component_types[typ.__name__].append(obj)
break
else:
component_types['other'].append(obj)
components[key] = obj
pp = component_types['PowerPool'][0]
assert len(component_types['PowerPool']) == 1
pp.components = components
pp.component_types = component_types
return pp
def __init__(self, config):
self._configure(config)
self._log_handlers = []
# Parse command line args
self.config['server_number'] += self.config['args']['server_number']
self.config['procname'] += "_{}".format(self.config['server_number'])
# setup all our log handlers
for log_cfg in self.config['loggers']:
if log_cfg['type'] == "StreamHandler":
kwargs = dict(stream=sys.stdout)
else:
kwargs = dict()
handler = getattr(logging, log_cfg['type'])(**kwargs)
log_level = getattr(logging, log_cfg['level'].upper())
handler.setLevel(log_level)
fmt = log_cfg.get('format', '%(asctime)s [%(name)s] [%(levelname)s] %(message)s')
formatter = logging.Formatter(fmt)
handler.setFormatter(formatter)
self._log_handlers.append((log_cfg.get('listen'), handler))
self.logger = self.register_logger(self.__class__.__name__)
setproctitle.setproctitle(self.config['procname'])
self.version = powerpool.__version__
self.version_info = powerpool.__version_info__
self.sha = getattr(powerpool, '__sha__', "unknown")
self.rev_date = getattr(powerpool, '__rev_date__', "unknown")
if self.sha == "unknown":
# try and fetch the git version information
try:
output = subprocess.check_output("git show -s --format='%ci %h'",
shell=True).strip().rsplit(" ", 1)
self.sha = output[1]
self.rev_date = output[0]
# celery won't work with this, so set some default
except Exception as e:
self.logger.info("Unable to fetch git hash info: {}".format(e))
self.algos = {}
self.server_start = datetime.datetime.utcnow()
self.logger.info("=" * 80)
self.logger.info("PowerPool stratum server ({}) starting up..."
.format(self.config['procname']))
if __debug__:
self.logger.info("Python not running in optimized mode. For best "
"performance set enviroment variable PYTHONOPTIMIZE=2")
gevent.spawn(BlockingDetector(raise_exc=False))
# Detect and load all the hash functions we can find
for name, algo_data in self.config['algorithms'].iteritems():
self.algos[name] = algo_data.copy()
self.algos[name]['name'] = name
mod = algo_data['module']
try:
self.algos[name]['module'] = import_helper(mod)
except ImportError:
self.algos[name]['module'] = None
else:
self.logger.info("Enabling {} hashing algorithm from module {}"
.format(name, mod))
self.event_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.events_enabled = self.config['events']['enabled']
if self.events_enabled:
self.logger.info("Transmitting statsd formatted stats to {}:{}".format(
self.config['events']['host'], self.config['events']['port']))
self.events_address = (self.config['events']['host'].encode('utf8'),
self.config['events']['port'])
# Setup all our stat managers
self._min_stat_counters = []
self._sec_stat_counters = []
if self.config['datagram']['enabled']:
listener = (self.config['datagram']['host'],
self.config['datagram']['port'] +
self.config['server_number'])
self.logger.info("Turning on UDP control server on {}"
.format(listener))
DatagramServer.__init__(self, listener, spawn=None)
[docs] def handle(self, data, address):
self.logger.info("Recieved new command {}".format(data))
parts = data.split(" ")
try:
component = self.components[parts[0]]
func = getattr(component, parts[1])
kwargs = {}
args = []
for arg in parts[2:]:
if "=" in arg:
k, v = arg.split("=", 1)
kwargs[k] = v
else:
args.append(arg)
if kwargs.pop('__spawn', False):
gevent.spawn(func, *args, **kwargs)
else:
func(*args, **kwargs)
except AttributeError:
self.logger.warn("Component {} doesn't have a method {}"
.format(*parts))
except KeyError:
self.logger.warn("Component {} doesn't exist".format(*parts))
except Exception:
self.logger.warn("Error in called function {}!".format(data),
exc_info=True)
[docs] def log_event(self, event):
""" Sends an event to statsd """
if self.events_enabled:
self.event_socket.sendto(event, self.events_address)
[docs] def start(self):
self.register_logger("gevent_helpers")
for comp in self.components.itervalues():
comp.manager = self
comp.counters = self.register_stat_counters(comp, comp.one_min_stats, comp.one_sec_stats)
if comp is not self:
comp.logger = self.register_logger(comp.name)
comp.start()
# Starts the greenlet
Component.start(self)
# Start the datagram control server if it's been inited
if self.config['datagram']['enabled']:
DatagramServer.start(self, )
# This is the main thread of execution, so just continue here waiting
# for exit signals
######
# Register shutdown signals
gevent.signal(signal.SIGUSR1, self.dump_objgraph)
gevent.signal(signal.SIGHUP, exit, "SIGHUP")
gevent.signal(signal.SIGINT, exit, "SIGINT")
gevent.signal(signal.SIGTERM, exit, "SIGTERM")
try:
gevent.wait()
# Allow a force exit from multiple exit signals
finally:
self.logger.info("Exiting requested, allowing {} seconds for cleanup."
.format(self.config['term_timeout']))
try:
for comp in self.components.itervalues():
self.logger.debug("Calling stop on component {}".format(comp))
comp.stop()
if gevent.wait(timeout=self.config['term_timeout']):
self.logger.info("All threads exited normally")
else:
self.logger.info("Timeout reached, shutting down forcefully")
except gevent.GreenletExit:
self.logger.info("Shutdown requested again by system, "
"exiting without cleanup")
self.logger.info("Exit")
self.logger.info("=" * 80)
[docs] def dump_objgraph(self):
""" Dump garbage collection information on SIGUSR1 to aid debugging
memory leaks """
import gc
gc.collect()
import objgraph
print "Dumping object growth ****"
objgraph.show_growth(limit=100)
print "****"
[docs] def exit(self, signal=None):
""" Handle an exit request """
self.logger.info("{} {}".format(signal, "*" * 80))
# Kill the top level greenlet
gevent.kill(gevent.hub.get_hub().parent)
@property
def status(self):
""" For display in the http monitor """
return dict(uptime=str(datetime.datetime.utcnow() - self.server_start),
server_start=str(self.server_start),
version=dict(
version=self.version,
version_info=self.version_info,
sha=self.sha,
rev_date=self.rev_date)
)
[docs] def _tick_stats(self):
""" A greenlet that handles rotation of statistics """
last_tick = int(time.time())
last_send = (last_tick // 60) * 60
while True:
now = time.time()
# time to rotate minutes?
if now > (last_send + 60):
for manager in self._min_stat_counters:
manager.tock()
for manager in self._sec_stat_counters:
manager.tock()
last_send += 60
# time to tick?
if now > (last_tick + 1):
for manager in self._sec_stat_counters:
manager.tick()
last_tick += 1
sleep(last_tick - time.time() + 1.0)
[docs] def register_logger(self, name):
logger = logging.getLogger(name)
logger.setLevel(getattr(logging, self.config['default_component_log_level']))
for keys, handler in self._log_handlers:
# If the keys are blank then we assume it wants all loggers
# registered
if not keys or name in keys:
logger.addHandler(handler)
return logger
[docs] def register_stat_counters(self, comp, min_counters, sec_counters=None):
""" Creates and adds the stat counters to internal tracking dictionaries.
These dictionaries are iterated to perform stat rotation, as well
as accessed to perform stat logging """
counters = {}
for key in min_counters:
new = MinuteStatManager()
new.owner = comp
new.key = key
counters[key] = new
self._min_stat_counters.append(new)
for key in sec_counters or []:
new = SecondStatManager()
new.owner = comp
new.key = key
counters[key] = new
self._sec_stat_counters.append(new)
return counters