Source code for beagles.backend.io.config_parser

import pickle
import sys
from typing import Generator
from itertools import product
from beagles.backend.io.darknet_config_file import DarknetConfigFile
from beagles.base import SubsystemPrototype, Subsystem, register_subsystem


NAME, DEFAULT = range(2)
FILTERS = ('filters', 1)
SIZE = ('size', 1)
STRIDE = ('stride', 1)
PAD = ('pad', 0)
PADDING = ('padding', 0)
INPUT = ('input', None)
BATCHNORM = ('batch_normalize', 0)
ACTIVATION = 'activation'
OUTPUT = 'output'
FLATTEN = 'flatten'
CONNECTED = 'connected'
TYPE = 'type'
INP_SIZE = 'inp_size'
OUT_SIZE = 'out_size'
KEEP = 'keep'
BINS = 'bins'
LINEAR = 'linear'
SELECTABLE_LAY = ['[connected]', '[extract]']
EXTRACTABLE_LAY = ['[convolutional]', '[conv-extract]']
KEEP_DELIMITER = '/'

def _fix_name(section_name: str, snake_case=True, prefix: str = ''):
    name = section_name.strip('[]')
    if snake_case:
        name = name.replace('-', '_')
    name = prefix + name
    return name

def _pad(dimension, padding, size, stride):
    return (dimension + 2 * padding - size) // stride + 1

def _local_pad(dimension, padding, size, stride):
    return (dimension - 1 - (1 - padding) * (size - 1)) // stride + 1

def _load_profile(file):
    with open(file, 'rb') as f:
        profiles = pickle.load(f, encoding='latin1')[0]
    return profiles

def _list_keep(inp):
    return [int(x) for x in inp.split(',')]


[docs]class ConfigParser(SubsystemPrototype): def __init__(self, create_key, *args, **kwargs): super(ConfigParser, self).__init__(create_key, *args, **kwargs)
[docs] @classmethod def create(cls, model): config = DarknetConfigFile(model) cls.layers, cls.metadata = config.tokens cls.h, cls.w, cls.c = cls.metadata[INP_SIZE] cls.l = cls.h * cls.w * cls.c cls.flat = False cls.conv = '.conv.' in model yield cls.metadata for i, section in enumerate(cls.layers): layer_handler = cls.get_register().get(_fix_name(section[TYPE])) handler = layer_handler(cls.create_key, cls) try: yield [layer for layer in handler(section, i)][0] except TypeError: raise TypeError('Layer {} not implemented'.format(section[TYPE])) section['_size'] = list([cls.h, cls.w, cls.c, cls.l, cls.flat]) if not cls.flat: cls.metadata[OUT_SIZE] = [cls.h, cls.w, cls.c] else: cls.metadata[OUT_SIZE] = cls.l
@register_subsystem('', ConfigParser) class DarknetConfigLayer(Subsystem): def constructor(self, parser): self.parser = parser self.layer_name, *_ = self.token.keys() def seek(self, layers, i): k = 1 while self.parser.layers[i - k][TYPE] not in layers: k += 1 if i - k < 0: break return k def _get_conv_properties(self, section): n = section.get(*FILTERS) size = section.get(*SIZE) stride = section.get(*STRIDE) pad = section.get(*PAD) padding = size // 2 if pad else section.get(*PADDING) activation = section.get(ACTIVATION) batch_norm = section.get(*BATCHNORM) or self.conv return [n, size, stride, padding, batch_norm, activation] @register_subsystem('select', ConfigParser) class Select(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor seek = DarknetConfigLayer.seek def __call__(self, section, i): p = self.parser if not p.flat: yield [FLATTEN, i] p.flat = True inp = section.get(*INPUT) layer = self.profile(inp, section) activation = section.get(ACTIVATION) section[KEEP] = section[KEEP].split(KEEP_DELIMITER) classes = int(section[KEEP][-1]) keep = _list_keep(section[KEEP][0]) keep_n = len(keep) train_from = classes * section[BINS] for count in range(section[BINS] - 1): keep += [num + classes for num in keep[-keep_n:]] l_ = self.select_inps(i) yield [self.layer_name, i, l_, section['old_output'], activation, layer, section['output'], keep, train_from] yield [activation, i] p.l = section['output'] def profile(self, inp, section): if type(inp) is str: file = inp.split(',')[0] layer_num = int(inp.split(',')[1]) profiles = _load_profile(section['profile']) layer = profiles[layer_num] else: layer = inp return layer def select_inps(self, i): k = self.seek(SELECTABLE_LAY, i) if i - k < 0: l_ = self.parser.l elif self.parser.layers[i - k][TYPE] == CONNECTED: l_ = self.parser.layers[i - k]['output'] else: l_ = self.parser.layers[i - k].get('old', [self.parser.l])[-1] return l_ @register_subsystem('convolutional', ConfigParser) class Convolutional(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor _get_conv_properties = DarknetConfigLayer._get_conv_properties def __call__(self, section, i): p = self.parser n, size, stride, padding, batch_norm, activation = self._get_conv_properties(section) yield [self.layer_name, i, size, p.c, n, stride, padding, batch_norm, activation] yield [activation, i] w_ = _pad(p.w, padding, size, stride) h_ = _pad(p.h, padding, size, stride) p.w, p.h, p.c = w_, h_, n p.l = p.w * p.h * p.c @register_subsystem('crop', ConfigParser) class Crop(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): yield [self.layer_name, i] @register_subsystem('local', ConfigParser) class Local(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor _get_conv_properties = DarknetConfigLayer._get_conv_properties def __call__(self, section, i): p = self.parser n, size, stride, *_, activation = self._get_conv_properties(section) pad = section.get(*PAD) w_ = _local_pad(self.w, pad, size, stride) h_ = _local_pad(self.w, pad, size, stride) yield [self.layer_name, i, size, p.c, n, stride, pad, w_, h_, activation] yield [activation, i] p.w, p.h, p.c = w_, h_, n p.l = p.w * p.h * p.c @register_subsystem('conv-extract', ConfigParser) class ConvExtract(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor seek = DarknetConfigLayer.seek _get_conv_properties = DarknetConfigLayer._get_conv_properties def __call__(self, section, i): p = self.parser profiles = _load_profile(section['profile']) inp_layer = None inp = section[INPUT[NAME]] out = section['output'] inp_layer = None if inp >= 0: inp_layer = profiles[inp] if inp_layer is not None: assert len(inp_layer) == p.c, 'Conv-extract does not match input dimension' out_layer = profiles[out] n, size, stride, padding, batch_norm, activation = self._get_conv_properties(section) c_ = self.extract_channels(i) yield [self.layer_name, i, size, c_, n, stride, padding, batch_norm, activation, inp_layer, out_layer] yield [activation, i] w_ = _pad(p.w, padding, size, stride) h_ = _pad(p.h, padding, size, stride) p.w, p.h, p.c = w_, h_, len(out_layer) p.l = p.w * p.h * p.c def extract_channels(self, i): k = self.seek(EXTRACTABLE_LAY, i) if i - k >= 0: previous_layer = self.parser.layers[i - k] c_ = previous_layer['filters'] else: c_ = self.parser.c return c_ @register_subsystem('conv-select', ConfigParser) class ConvSelect(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor _get_conv_properties = DarknetConfigLayer._get_conv_properties def __call__(self, section, i): p = self.parser n, size, stride, padding, *mess = self._get_conv_properties(section) section[KEEP] = section[KEEP].split(KEEP_DELIMITER) classes = int(section[KEEP][-1]) keep = _list_keep(section[KEEP][0]) segment = classes + 5 assert n % segment == 0, 'conv-select: segment failed' bins = n // segment keep_idx = list() for j in range(bins): offset = j * segment keep_idx.append([offset + k for k in range(5)]) keep_idx.append([offset + 5 + k for k in keep]) w_ = _pad(p.w, padding, size, stride) h_ = _pad(p.h, padding, size, stride) c_ = len(keep_idx) yield [self.layer_name, i, size, p.c, n, stride, padding, *mess, keep_idx, c_] p.w, p.h, p.c = w_, h_, c_ p.l = p.w * p.h * p.c @register_subsystem('maxpool', ConfigParser) class MaxPool(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): p = self.parser stride = section.get(*STRIDE) size = section.get(SIZE[0], stride) padding = section.get(PADDING[0], (size - 1) // 2) yield [self.layer_name, i, size, stride, padding] w_ = (p.w + 2 * padding) // section[STRIDE[0]] h_ = (p.h + 2 * padding) // section[STRIDE[0]] p.w, p.h = w_, h_ p.l = p.w * p.h * p.c @register_subsystem('connected', ConfigParser) class Connected(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): p = self.parser if not p.flat: yield [FLATTEN, i] p.flat = True activation = section.get(ACTIVATION) yield [self.layer_name, i, p.l, section['output'], activation] yield [activation, i] p.l = section['output'] @register_subsystem('extract', ConfigParser) class Extract(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): p = self.parser if not p.flat: yield [FLATTEN, i] p.flat = True activation = section.get(ACTIVATION) profiles = _load_profile(section['profile']) inp_layer = None inp = section[INPUT[NAME]] out = section['output'] if inp >= 0: inp_layer = profiles[inp] out_layer = profiles[out] old = section['old'] old = [int(x) for x in old.split(',')] if inp_layer is not None: if len(old) > 2: h_, w_, c_, n_ = old inp_layer = self.new(inp_layer, h_, w_) old = [h_ * w_ * c_, n_] if len(inp_layer) == p.l: msg = f'Extract does not match input dimension {len(inp_layer)} != {p.l}.' raise ValueError(msg) section['old'] = old yield [self.layer_name, i, *old, activation, inp_layer, out_layer] yield [activation, i] p.l = len(out_layer) def new(self, input_layer, heights, widths): range_c = range(self.parser.c) range_h = range(self.parser.h) range_w = range(self.parser.w) new_inp = list() for i in range_c: if i not in input_layer: continue new_inp += [k + widths * (j + heights * i) for j in range_h for k in range_w] return new_inp @register_subsystem('route', ConfigParser) class Route(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): p = self.parser routes = section['layers'] if type(routes) is int: routes = [routes] else: routes = [int(x.strip()) for x in routes.split(',')] routes = [i + x if x < 0 else x for x in routes] for j, x in enumerate(routes): lx = p.layers[x] _size = lx['_size'][:3] if j == 0: p.h, p.w, p.c = _size else: h_, w_, c_ = _size assert w_ == p.w and h_ == p.h, \ f'Routing incompatible sizes from {lx[TYPE]}({h_}x{w_}x{c_})' p.c += c_ yield [self.layer_name, i, routes] p.l = p.w * p.h * p.c @register_subsystem('shortcut', ConfigParser) class Shortcut(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): p = self.parser index = int(section['from']) activation = section.get(ACTIVATION) assert activation == LINEAR, 'Layer {} can only use linear activation'.format( section[TYPE]) from_layer = i + index yield [self.layer_name, i, from_layer] p.l = p.w * p.h * p.c @register_subsystem('upsample', ConfigParser) class Upsample(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): p = self.parser stride = section.get(*STRIDE) assert stride == 2, \ 'Layer {} can only be of stride 2'.format(section[TYPE]) w = p.w * stride h = p.h * stride yield [self.layer_name, i, stride, h, w] p.l = p.w * p.h * p.c @register_subsystem(token='reorg', prototype=ConfigParser) class Reorg(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): p = self.parser stride = section.get(*STRIDE) yield [self.layer_name, i, stride] p.w = p.w // stride p.h = p.h // stride p.c = p.c * (stride ** 2) p.l = p.w * p.h * p.c @register_subsystem('avgpool', ConfigParser) class AvgPool(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): self.parser.flat = True self.parser.l = self.parser.c yield [self.layer_name, i] @register_subsystem('dropout', ConfigParser) class Dropout(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): yield [self.layer_name, i, section['probability']] @register_subsystem('softmax', ConfigParser) class SoftMax(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): yield [self.layer_name, i, section['groups']] @register_subsystem(token='lstm', prototype=ConfigParser) class LSTM(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): yield [self.layer_name, i, section[OUTPUT], section.get(*BATCHNORM)] @register_subsystem(token='rnn', prototype=ConfigParser) class RNN(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): yield [self.layer_name, i, section[OUTPUT], section.get(*BATCHNORM), section[ACTIVATION]] \ @register_subsystem(token='gru', prototype=ConfigParser) class GRU(DarknetConfigLayer): constructor = DarknetConfigLayer.constructor def __call__(self, section, i): yield [self.layer_name, i, section[OUTPUT], section.get(*BATCHNORM)]