Source code for emva1288.process.routines

# -*- coding: utf-8 -*-
# Copyright (c) 2014 The EMVA1288 Authors. All rights reserved.
# Use of this source code is governed by a GNU GENERAL PUBLIC LICENSE that can
# be found in the LICENSE file.

"""Utils functions

"""
import numpy as np
import os
from scipy.optimize import leastsq
from lxml import etree
from PIL import Image
from collections import OrderedDict
from emva1288.camera import routines
from scipy.ndimage import convolve
import warnings
# import cv2


SIGNIFICANT_DIGITS = 7


[docs]def load_image(fname): img = Image.open(fname) img = np.asarray(img.split()[0]) # img = cv2.imread(fname, cv2.CV_LOAD_IMAGE_UNCHANGED) # img = cv2.split(img)[0] return img
[docs]def get_int_imgs(imgs): ''' Returns the sum, pseudo-variance, mean difference from list of images sum is just the image resulting on the addition of all the images pvar is the pseudo-variance, this is defined by: pvar = SUM((L(i)^2 - (SUM(i))^2) to get variance from pseudo-variance var = (1/(L^2) * 1/(L - 1)) * pvar dmean is the mean difference between the images, obtained by: SUM(mean(i) - mean(i-1)), when i>0 this is used to remove mu_y^2 from the total variance important value to get sigma_y (temporal variance). keys & value in output dict: L : number of images for computation--> int sum : total summed image [M x N] --> array. int64 pvar : summed variance image [M x N] --> array. int64 dmean: summed mean differences --> float ''' L = len(imgs) # number of images sum_ = 0 sq_ = 0 dmean_ = 0 for n, img in enumerate(imgs): # we force the images as int64 to make sure we do not clip i = img.astype(np.int64) sum_ += i sq_ += np.square(i) if n > 0: dmean_ += np.mean(i) - np.mean(oi) oi = i # old image # the pseudo variance can be computed from the sum image and the sum of # the square images var_ = L * (L * sq_ - np.square(sum_)) return {'L': L, 'sum': sum_, 'pvar': var_, 'dmean': dmean_}
[docs]def LinearB0(Xi, Yi): X = np.asfarray(Xi) Y = np.asfarray(Yi) # we want a function y = m * x def fp(v, x): return x * v[0] # the error of the function e = x - y def e(v, x, y): return (fp(v, x) - y) # the initial value of m, we choose 1, because we thought YODA would # have chosen 1 v0 = [1.0] vr, _success = leastsq(e, v0, args=(X, Y)) # compute the R**2 (sqrt of the mean of the squares of the errors) err = np.sqrt(sum(np.square(e([vr], X, Y))) / (len(X) * len(X))) # Some versions of leastsq returns an array, other a scalar, so here we # make sure # it is an array val = np.array([vr]).flatten() return val, err
[docs]def LinearB(Xi, Yi): X = np.asfarray(Xi) Y = np.asfarray(Yi) # we want a function y = m * x + b def fp(v, x): return x * v[0] + v[1] # the error of the function e = x - y def e(v, x, y): return (fp(v, x) - y) # the initial value of m, we choose 1, because we thought YODA would # have chosen 1 v0 = np.array([1.0, 1.0]) vr, _success = leastsq(e, v0, args=(X, Y)) # compute the R**2 (sqrt of the mean of the squares of the errors) err = np.sqrt(sum(np.square(e(vr, X, Y))) / (len(X) * len(X))) # print vr, success, err return vr, err
[docs]def GetImgShape(img): rows = 1 if img.ndim == 1: cols, = img.shape else: rows, cols = img.shape return rows, cols
[docs]def FFT1288(img, rotate=False, n=1): '''Compute the FFT emva1288 style Compute an FFT per line and average the resulting ffts If img is a masked array then remove the masked data line by line and calculate the FFT on the minimum number of valid columns. Parameters ---------- img : array Input image rotate : bool (optional) Rotate the image before performing the FFT n : int (optional) If the image is the sum of several images use this value to produce the fft of the average image Returns ------- array : One dimension FFT power spectrum ''' if rotate: img = img.transpose() # remove masked entries if img is a masked array if isinstance(img, np.ma.masked_array): img = _row_wise_compress(img) _rows, cols = GetImgShape(img) # simply perform fft on x axis img = np.asfarray(img) - np.mean(img) fft = np.fft.fft(img, axis=1) fft /= np.sqrt(cols) fabs = np.real(fft * np.conjugate(fft)) # extract the mean of each column of the fft r = np.mean(fabs, axis=0) # if the image is the sum of n image, the fft of the average image is r /= n ** 2 # Return only half of the spectrogram (it is symmetrical) return r[: cols // 2 + 1]
def _row_wise_compress(img): """ Compress masked ndarrays row-by-row Empty (fully masked) rows are removed. The number of columns is truncated to the row with the least amount of valid entries. """ img = img.copy() # use the same ndarray to save memory compressed = np.ma.getdata(img) # the maximum numbers or cols is the shape cols = compressed.shape[1] # keep count of non empty lines lines = 0 # compress line by line for line in img: # drop masked data line = line.compressed() # assure the line is not empty if line.size == 0: continue # set the data in the compressed array compressed[lines, :line.size] = line # keep count of non-empty lines lines += 1 # keep trac of minimum column size cols = min(cols, line.size) # trim to actual number of lines and columns return compressed[:lines, :cols] def _row_wise_compress(img): """ Compress masked ndarrays row-by-row Empty (fully masked) rows are removed. The number of columns is truncated to the row with the least amount of valid entries. """ img = img.copy() # use the same ndarray to save memory compressed = np.ma.getdata(img) # the maximum numbers or cols is the shape cols = compressed.shape[1] # keep count of non empty lines lines = 0 # compress line by line for line in img: # drop masked data line = line.compressed() # assure the line is not empty if line.size == 0: continue # set the data in the compressed array compressed[lines, :line.size] = line # keep count of non-empty lines lines += 1 # keep trac of minimum column size cols = min(cols, line.size) # trim to actual number of lines and columns return compressed[:lines, :cols]
[docs]def GetFrecs(fft): n = len(fft) x = np.arange(n) x = x * 1.0 / (2 * n) return x
[docs]def Histogram1288(img, Qmax): y = np.ravel(img) ymin = np.min(y) ymax = np.max(y) # Because we are working with integers, minimum binwidth is 1 W = 1 q = ymax - ymin Q = q + 1 # When too many bins, create a new integer binwidth if Q > Qmax: # We want the number of bins as close as possible to Qmax (256) W = int(np.floor(1. * q / Qmax)) + 1 Q = int(np.floor(1. * q / W)) + 1 # The bins # we need one more value for the numpy histogram computation # numpy used bin limits # in our interpretation we use the lower limit of the bin B = [ymin + (i * W) for i in range(Q + 1)] # Normal distribution with the original sigma, and mean mu = np.mean(y) sigma = np.std(y) # For masked arrays, the total number of pixel is the sum of the non-masked # value N = np.ma.count(y) normal = ((1. * (ymax - ymin) / Q) * N / (np.sqrt(2 * np.pi) * sigma) * np.exp(-0.5 * (1. / sigma * (B[:-1] - mu)) ** 2)) ############################################# # # # Reference algorithm, it's pretty slow # # the numpy version gives the same results # # #The histogram container # H = np.zeros((Q,), dtype=np.int64) # # #Histogram computation # for yi in y: # q = (yi - ymin) / W # H[q] += 1 ############################################# H, _b = np.histogram(y, B, range=(ymin, ymax)) return {'bins': np.asfarray(B[:-1]), 'values': H, 'model': normal}
[docs]def cls_1288_info(cls): """Dictionnary that represents results. Parameters ---------- cls : Class from wich to extract the information. Returns ------- dict : Dictionnary extracted using the format defined by a custom sphinx directive with the following format:: {attribute1: {'section': section name, 'units': attribute units, 'short': attribute short description, 'latexname': latex name for the attribute, 'symbol': symbol to represent the value}} """ d = OrderedDict() items = [name for name in sorted(cls.__dict__.keys())] for attribute_name in items: # Extract the doc from the Processing methods doc = getattr(cls, attribute_name).__doc__ if not doc: continue # All the lines in the docstring lines = [s.strip() for s in doc.splitlines()] # to store the relevant tag lines tag_lines = [] n = len(lines) flag = False for line in lines: # Get only those that are relevant (start with .. emva1288::) if line.startswith('.. emva1288::'): flag = True continue if flag: if not line.strip(): flag = False continue tag_lines.append(line) # if there are not relevant lines skip and go to next method if not tag_lines: continue # To store the info from the doc attribute_info = {} for line in tag_lines: tags = [x.strip() for x in line.split(':', 2) if x.strip()] # Each valid tag has to be xx:yy if len(tags) != 2: continue # Fill the dict attribute_info[tags[0].lower()] = tags[1] # If there is no section set it as other attribute_info.setdefault('section', 'other') d[attribute_name] = attribute_info return d
def _sections_first(dct): """For backwards compatibility where we use to have results with section as first keys """ d = OrderedDict() sections = sorted({k['section'] for k in dct.values()}) for section in sections: d[section] = OrderedDict() for k, v in dct.items(): if v['section'] != section: continue d[section][k] = v return d
[docs]def obj_to_dict(obj): ''' Get the info dict from the object class Add the values or Data to this dict for each method if the return value is a dict, it is inserted as d[SectionName][MethodName][Data] = ReturnValue if not d[SectionName][MethodName][Value] = ReturnValue ''' d = cls_1288_info(obj.__class__) for attribute in d.keys(): # Get the value for the given attribute val = getattr(obj, attribute) if callable(val): val = val() if isinstance(val, dict): d[attribute]['data'] = val else: d[attribute]['value'] = val return d
[docs]def dict_to_xml(d, root='results', filename=None): ''' Takes a dict and return a well formed xml string ''' def key_to_xml(d, r): ''' Recursive call to add the key/value from dict to the r element tree If the value is an array joint the values casted as strings with whitespaces separator if the value is something else, it is casted as string ''' for k in d.keys(): e = etree.SubElement(r, k) # when the value for the key is a dict, call the function again if isinstance(d[k], dict): r.append(key_to_xml(d[k], e)) # if the value is an array # add the values of the array as a string separated by whitespaces # Note to self: add other array types as needed elif isinstance(d[k], np.ndarray): a = [str(x) for x in d[k]] e.text = ' '.join(a) # if something else, just add the corresponding string value else: e.text = str(d[k]) return r tree = etree.Element(root) xml = key_to_xml(d, tree) t = etree.tostring(xml, pretty_print=True) if filename is None: return t with open(filename, 'w') as f: f.write(t.decode('utf-8'))
[docs]def xml_to_dict(xml): ''' If xml is a file, opens and parse, if string, parse it from string Convert the xml to a dict using element_to_dict Process the resulting dict: Cast Data to numpy float arrays (split the string by whitespaces) Cast Value to float ''' try: if os.path.isfile(xml): tree = etree.parse(xml) else: tree = etree.ElementTree.fromstring(xml) except: print('Problems loading XML') return None def element_to_dict(r): ''' Recursive call to add dictionnary elements from the r xml element ''' dout = {} for child in r: # The resulting keys are forced to lowercase to compoensate # for some windows versions that use CamelCase if list(child): # if the element has children call the function again with # children as r dout[child.tag.lower()] = element_to_dict(child) else: dout[child.tag.lower()] = child.text return dout root = tree.getroot() d = element_to_dict(root) # loop to reconstruct arrays from strings in Data elements for section, method in d.items(): for methodname, value in method.items(): if 'data' in value: for data in value['data']: v = value['data'][data] v = v.strip() v = v.split() d[section][methodname]['data'][data] = np.asfarray(v) else: v = value['value'] if v in ('None', None, 'none'): v = None else: # sometimes the decimal point is written with , instead of. v = v.replace(',', '.') v = float(v) d[section][methodname]['value'] = v return d
[docs]def round_significant(v, sig=SIGNIFICANT_DIGITS): ''' Round up to the given significant digits, used for comparison ''' if v == 0.0: return 0.0 return round(v, sig - np.int(np.floor(np.log10(np.abs(v)))) - 1)
round_array = np.vectorize(round_significant)
[docs]def compare_xml(x1, x2, filename=None): # load the xml into dicts f1 = xml_to_dict(x1) f2 = xml_to_dict(x2) s = '' # if something is wrong abort if f1 is None or f2 is None: return s c1 = list(f1.keys()) c2 = list(f2.keys()) # loop throught the combined categories categories = set(c1) | set(c2) for category in sorted(categories): s += '*' * 70 + '\n' s += category + '\n' s += '*' * 70 + '\n' # check if missing category in one of the dicts if category not in c1 or category not in c2: t1 = category in c1 t2 = category in c2 s += '{0:<35}'.format('PRESENT') s += '{0:<20}{1:<20}FAIL'.format(str(t1), str(t2)) + '\n' continue m1 = f1[category].keys() m2 = f2[category].keys() # loop throught the combined methodnames methodnames = set(m1) | set(m2) for methodname in sorted(methodnames): s += '{0:<35}'.format(methodname) # check if methodname in dict if methodname not in m1: v1 = None a1 = None # get the value and the data else: v1 = f1[category][methodname].get('value', None) a1 = f1[category][methodname].get('data', None) if methodname not in m2: v2 = None a2 = None else: v2 = f2[category][methodname].get('value', None) a2 = f2[category][methodname].get('data', None) # first we check for values and then for data # if both present only values will be taken in count for comparison # If both are values if v1 is not None and v2 is not None: try: r = (round_significant(v1) - round_significant(v2)) == 0.0 except: r = False t1 = v1 t2 = v2 # if both are arrays elif a1 is not None and a2 is not None: k1 = a1.keys() k2 = a2.keys() t1 = 'Array' t2 = 'Array' # if different keys, is invalid if (set(k1) ^ set(k2)): r = False else: # loop throught the keys for k in k1: try: r = np.max(np.abs(round_array(a1[k]) - round_array(a2[k]))) == 0.0 except: r = False if not r: break else: t1 = str(v1) t2 = str(v2) r = False s += '{0:<20}{1:<20}'.format(t1, t2) if r: s += 'OK' + '\n' else: s += 'FAIL' + '\n' s += '\n' if filename is None: return s with open(filename, 'w') as f: f.write(s) return s
[docs]def high_pass_filter(img, dim): """High pass filtering on image Computes the highpass filtering as defined by the emva standart keeping the result as an int image. The computation preserves the image as ints and doesn't perform the final division, but returns the dividing factor Parameters ---------- img : np.array the image to filter dim : int The size of the highpass filter Returns ------- d : dict The data dictionary of the result. The keys are: 'img' : The filtered image 'multiplicator' : Factor from the convolution to be considered in the computation of the histogram """ # By definition in the emva standart, the high pass filter is using # the following operation : # # y' = y-R (*) y # # where (*) represents a convolution and R is a 5x5 matrix filled with 1/25 # In order to maintain int values for a higher precision, we make the # following manipulation : # # y' = P (*) y - R (*) y, where P is a 5x5 matrix of zeros with P[2,2] = 1 # # multiplying the right side of the equation by 25 and then dividing by 25 # # y' = ((25 * P - R') (*) y) /25, where R' is a 5x5 matrix of ones # # simplifying # # y' = (M (*) y)/25, where M is a 5x5 matrix of -1 with M[2,2] = 24 # # We don't peform the division to keep the int precision, but we return the # factor (25) # If the image is from a color camera, the format of the data to compute is # a masked array. Convolution cannot typically be performed on a masked # array, but considering this particular operation we can work around this # problem. # Considering the kernel, the unmasked values are correctly computed using # convolve(), while the others are wrong. But by re-applying the mask, we # work around this problem. # The multiplicator must be changed to the number of valid pixels # being considered in the convolution. # Check validity of kernel size if not dim % 2: raise ValueError('The size of the highpass filter must be odd') # Initializing the kernel kernel = np.ones((dim, dim)) sl = dim//2 if isinstance(img, np.ma.MaskedArray): # Saving the mask mask = np.ma.getmask(img) # These next few lines are to determine the number of pixels being # considered in the convolution value = np.ones(img.shape) # Applying the mask on a matrix of ones and filling the masked values # with 0 value = np.ma.MaskedArray(value, mask) value = value.filled(0) # The convolution of the matrix with a 5x5 matrix of ones will return # a matrix where the values where there was no mask correspong to the # number of considred pixels value = convolve(value, kernel) # Slice off values that are invalid after the convolution value = np.ma.MaskedArray(value, mask)[sl:-sl, sl:-sl] # After applying the mask all the values should be equal. If not, the # filter is not bayer and will not work properly with this code if value.max() != value.min(): raise ValueError('Masked image does not have a valid bayer' 'filter') # Setting the kernel kernel *= -1 kernel[sl, sl] = value.max()-1 # Finally computing the filtering and slicing off the edges img = img.filled(0) img = convolve(img, kernel) img = np.ma.array(img, mask=mask)[sl:-sl, sl:-sl] else: # Set the kernel and compute, then slice off the edges kernel *= -1 kernel[sl, sl] = dim**2-1 img = convolve(img, kernel)[sl:-sl, sl:-sl] return {'img': img, 'multiplicator': kernel[sl, sl] + 1}