Source code for ceo.GMTLIB

import sys
import math
import numpy as np
import numpy.linalg as LA
from scipy.optimize import brenth, leastsq
from scipy.signal import fftconvolve
from skimage.feature import blob_log
from scipy.ndimage.interpolation import rotate
from scipy.interpolate import griddata, LinearNDInterpolator, NearestNDInterpolator
from scipy.spatial import Delaunay
import os.path
from collections import OrderedDict
import boto3
import botocore
from . import phaseStats
from  ceo.nastran_pch_reader import nastran_pch_reader
from ceo import Source, GMT_M1, GMT_M2, ShackHartmann, GeometricShackHartmann,\
    TT7,\
    GmtMirrors, SegmentPistonSensor,\
    constants, Telescope, cuFloatArray, cuDoubleArray, Aperture,\
    Transform_to_S, Intersect, Reflect, Refract, Transform_to_R, ZernikeS

class CalibrationVault(object):

    def __init__(self,D,valid=None,nThreshold=None,insertZeros = None):
        self.D = D
        if nThreshold is None:
            self._nThreshold_ = [0]*len(D)
        else:
            self._nThreshold_ = nThreshold
        self._threshold_ = None
        if valid is None:
            self.valid = [ np.ones(X.shape[0],dtype=np.bool) for X in self.D]
        else:
            self.valid = valid
        if insertZeros is None:
            self.zeroIdx = [None]*len(self.D)
        else:
            self.zeroIdx = insertZeros
        self.UsVT = [LA.svd(X,full_matrices=False) for X in self.D]
        self.M = [ self.__recon__(X,Y,Z) for X,Y,Z in zip(self.UsVT,self._nThreshold_,self.zeroIdx) ]
            
    def __recon__(self,_UsVT_,_nThreshold_,zeroIdx):
        iS = 1./_UsVT_[1]
        if _nThreshold_>0:
            iS[-_nThreshold_:] = 0
        _M_ = np.dot(_UsVT_[2].T,np.dot(np.diag(iS),_UsVT_[0].T))
        if zeroIdx is not None:
            _M_ =  np.insert(_M_,zeroIdx,0,axis=0)
        return _M_

    @property
    def nThreshold(self):
        "# of discarded eigen values"
        return self._nThreshold_
    @nThreshold.setter
    def nThreshold(self, value):
        print("@(CalibrationMatrix)> Updating the pseudo-inverse...")
        self._nThreshold_ = value
        self.M = [ self.__recon__(X,Y,Z) for X,Y,Z in zip(self.UsVT,self._nThreshold_,self.zeroIdx) ]

    @property
    def threshold(self):
        return self._threshold_
    @threshold.setter
    def threshold(self,value):
        self._threshold_ = value
        selfqq.nThreshold = [ np.sum(X[1]<X[1][0]*value) for X in  self.UsVT ]

    @property
    def eigenValues(self):
        return [ X[1] for X in self.UsVT ]

    def dot( self, s ):
        return np.concatenate([ np.dot(X,s[Y.ravel()]) for X,Y in zip(self.M,self.valid) ])

[docs]class GMT_MX(GmtMirrors): """ A class container from GMT_M1 and GMT_M2 classes Parameters ---------- D : float The size of the pupil plane in meter D_px : int The size of the pupil plane in pixel M1_radial_order : int, optionnal The largest radial order of the Zernike polynomials on M1 segments, default to 0 M2_radial_order : int, optionnal The largest radial order of the Zernike polynomials on M2 segments, default to 0 Attributes ---------- D : float The size of the pupil plane in meter D_px : int The size of the pupil plane in pixel M1 : GMT_M1 The GMT M1 CEO class M2 : GMT_M2 The GMT M2 CEO class sphere_radius : float The curvature radius of the ray tracing reference sphere See also -------- GMT_M1 : the class for GMT M1 model GMT_M2 : the class for GMT M2 model Source : a class for astronomical sources cuFloatArray : an interface class between GPU host and device data for floats Examples -------- >>> import ceo The mandatory parameters are the size of the pupil plane in meter or in pixel >>> gmt = ceo.GMT_MX(25.5,256) If more that one source (lets say 3) is going to be propagated through the telescope: >>> gmt = ceo.GMT_MX(25.5,256, N_SRC=3) A combination of Zernike polynomials can be applied to M1 and M2 segments by specifying the largest radial order on each mirror >>> gmt = ceo.GMT_MX(25.5,256, M1_radial_order=8, M2_radial_order=14) A source is propagated (geometrically) through the telescope with the following procedure: >>> src = ceo.Source("R",rays_box_size=25.5,rays_box_sampling=256,rays_origin=[0.0,0.0,25]) >>> gmt.propagate(src) and the wavefront phase is retrieved either as a 2D map cuFloatArray object with >>> gpu_ps2d = src.phase() or as a 1D vector with >>> gpu_ps1d = src.wavefront.phase() """ def __init__(self, D=None, D_px=None, M1_radial_order=0, M2_radial_order=0, M1_mirror_modes=u"zernike", M2_mirror_modes=u"zernike", M1_N_MODE=0 ,M2_N_MODE=0, M1_mirror_modes_data=None, M2_mirror_modes_data=None): if type(M2_mirror_modes) is dict: suit = OrderedDict() suit['Ni'] = np.array(M2_mirror_modes['Ni'],dtype=np.int32) suit['L'] = np.array(M2_mirror_modes['L'],dtype=np.double) suit['N_SET'] = np.array(M2_mirror_modes['N_SET'],dtype=np.int32) suit['N_MODE'] = np.array(M2_mirror_modes['N_MODE'],dtype=np.int32) suit['s2b'] = np.array(M2_mirror_modes['s2b'],dtype=np.int32) suit['M'] = np.zeros((suit['Ni']**2*suit['N_SET']*suit['N_MODE'])) data_SET = None if 'S3 bucket' in M2_mirror_modes: BUCKET_NAME = M2_mirror_modes['S3 bucket'] KEY = M2_mirror_modes['S3 key'] file_ext = KEY.split('.')[-1] FILE = os.path.join('/tmp','data.'+file_ext) s3 = boto3.resource('s3', region_name='us-west-2') print('Downloading %s...!'%(BUCKET_NAME+'/'+KEY)) try: s3.Bucket(BUCKET_NAME).download_file(KEY, FILE) except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == "404": print("The object does not exist.") else: raise if file_ext in ['csv','txt']: data_SET = [np.loadtxt(FILE,delimiter=',')] elif file_ext=='pch': parser = nastran_pch_reader.PchParser(FILE) data = parser.get_displacements(1) #NODES_FILE = os.path.join(') nodes = np.loadtxt('7segmentface.rtf',skiprows=3,usecols=[0,3,4,5]) nodeid = nodes[:,0] xyz = np.vstack([data[x][:3] for x in nodeid]) S = xyz[:,2] nodex, nodey, nodez = nodes[:,1]+xyz[:,0],nodes[:,2]+xyz[:,1],nodes[:,3] gmt = GMT_MX() O = gmt.M2.rigid_body_CS.origin[:] O[:,2] -= O[6,2] R = gmt.M2.rigid_body_CS.R o = np.arange(101)/101*2*np.pi r = 1.1/2 x, y = r*np.cos(o),r*np.sin(o) seg_nodex = [] seg_nodey = [] seg_nodez = [] seg_S = [] for k in range(7):
[docs] seg_nodes = R[k,:,:].T @ (np.vstack([nodex,nodey,nodez]) - O[k,:][:,None]) noder = np.hypot(seg_nodes[0,:],seg_nodes[1,:]) m = noder<r seg_nodex.append( seg_nodes[0,m] ) seg_nodey.append( seg_nodes[1,m] ) seg_nodez.append( seg_nodes[2,m] ) seg_S.append( S[m] ) Z = ZernikeS(2) Sp = [] a_ = [] modes = [1,2,3,4] data = [] for k in range(7): zr = np.hypot(seg_nodex[k],seg_nodey[k]) zo = np.arctan2(seg_nodey[k],seg_nodex[k]) gzr = cuDoubleArray(host_data=zr/zr.max()) gzo = cuDoubleArray(host_data=zo) P = [] for mode in modes: Z.reset() Z.a[0,mode-1] = 1 Z.update() P.append( Z.surface(gzr,gzo).host() ) P = np.vstack(P).T S = seg_S[k][:,None] a = LA.lstsq(P,S,rcond=None)[0] Sp.append( S- P @ a ) a_.append(a) data.append( np.vstack([seg_nodex[k],seg_nodey[k],Sp[k].ravel()]).T ) data_SET = data else: data_SET = M2_mirror_modes['DATA'] if data_SET is not None: M = np.zeros((suit['Ni']**2,suit['N_SET'])) for k_SET in range(suit['N_SET']): print('Gridding SET #%d'%k_SET) data = data_SET[k_SET] datatri = Delaunay(data[:,:2]) itpr = LinearNDInterpolator(datatri,data[:,2]) u = np.linspace(-1,1,suit['Ni'])*suit['L']*0.5 y,x = np.meshgrid(u,u) zi = itpr(x,y) idx = np.isnan(zi) if np.any(idx): itpr = NearestNDInterpolator(datatri,data[:,2]) nzi = itpr(x[idx],y[idx]) zi[idx] = nzi M[:,k_SET] = zi.ravel(); print('') suit['M'] = M.flatten(order='F') M2_mirror_modes = u"modes" path_to_modes = os.path.join( os.path.abspath(__file__).split('python')[0] , 'gmtMirrors' , M2_mirror_modes+'.ceo' ) print('Writing modes to %s...'%path_to_modes) with open(path_to_modes,'w') as f: for key in suit: suit[key].tofile(f) else: M2_mirror_modes=u"zernike" super(GMT_MX,self).__init__( M1_radial_order=M1_radial_order, M2_radial_order=M2_radial_order, M1_mirror_modes=M1_mirror_modes, M2_mirror_modes=M2_mirror_modes, M1_N_MODE=M1_N_MODE, M2_N_MODE=M2_N_MODE, M1_mirror_modes_data=M1_mirror_modes_data, M2_mirror_modes_data=M2_mirror_modes_data) def calibrate(self,wfs,gs,mirror=None,mode=None,stroke=None, first_mode=3, closed_loop_calib=False, minus_M2_TT=False, calibrationVaultKwargs=None): """ Calibrate the different degrees of freedom of the mirrors Parameters ---------- wfs : ShackHartmann, DispersedFringeSensor, etc. The wavefront sensor gs : Source The guide star mirror : string The mirror label: eiher "M1" or "M2" ("MOUNT" is also accepted and will emulate a telescope pointing error) mode : string The degrees of freedom label for M1: "global tip-tilt", "zernike", "bending modes", "Txyz", "Rxyz", "Rz", "segment tip-tilt" for M2: "global tip-tilt", "pointing neutral", "coma neutral", "zernike", "Karhunen-Loeve", "Txyz", "Rxyz", "Rz", "segment tip-tilt", "TT7 segment tip-tilt" for MOUNT: "pointing" stroke : float The amplitude of the motion """ def close_NGAOish_loop(): niter = 10 nzernall = (self.M2.modes.n_mode-1)*7 for ii in range(niter): self.cl_gs.reset() self.propagate(self.cl_gs) self.cl_wfs.reset() self.onps.reset() self.cl_wfs.analyze(self.cl_gs) slopevec = self.cl_wfs.get_measurement() onpsvec = self.onps.piston(self.cl_gs).ravel() - self.onps_signal_ref AOmeasvec = np.concatenate((slopevec, onpsvec)) myAOest1 = 0.7*np.dot(self.R_AO, AOmeasvec) self.M2.modes.a[:,1:] -= myAOest1[0:nzernall].reshape((7,-1)) self.M2.motion_CS.origin[:,2] -= myAOest1[nzernall:] self.M2.motion_CS.update() self.M2.modes.update() def close_LTAOish_loop(): niter = 10 for ii in range(niter): self.cl_gs.reset() self.propagate(self.cl_gs) self.cl_wfs.reset() self.cl_wfs.analyze(self.cl_gs) slopevec = self.cl_wfs.get_measurement() myAOest1 = 0.7*np.dot(self.R_AO, slopevec) self.M2.modes.a[:,1:] -= myAOest1.reshape((7,-1)) self.M2.modes.update() def close_AO_loop(): if self.AOtype=='NGAOish': close_NGAOish_loop() elif self.AOtype=='LTAOish': close_LTAOish_loop() def pushpull(action): def get_slopes(stroke_sign): self.reset() action(stroke_sign*stroke) if closed_loop_calib==True: close_AO_loop() gs.reset() self.propagate(gs) wfs.reset() wfs.analyze(gs) return wfs.get_measurement() s_push = get_slopes(+1) s_pull = get_slopes(-1) return 0.5*(s_push-s_pull)/stroke def pushpull_minus_M2TT(action): def close_M2_segTT_loop(): niter = 3 myTTest1 = np.zeros(14) for ii in range(niter): gs.reset() self.propagate(gs) wfs.reset() wfs.analyze(gs) slopevec = wfs.get_measurement() myTTest1 += np.dot(recmat, slopevec) myTTest = myTTest1.reshape((7,2)) for idx in range(7): self.M2.update(euler_angles= [-myTTest[idx,0],-myTTest[idx,1],0], idx=idx+1) def get_slopes(stroke_sign): self.reset() action(stroke_sign*stroke) close_M2_segTT_loop() gs.reset() self.propagate(gs) wfs.reset() wfs.analyze(gs) return wfs.get_measurement() def M1_zernike_update(_stroke_): self.M1.modes.a[kSeg,kMode] = _stroke_ self.M1.modes.update() def M2_zernike_update(_stroke_): self.M2.modes.a[kSeg,kMode] = _stroke_ self.M2.modes.update() if minus_M2_TT: pushpull = pushpull_minus_M2TT sys.stdout.write("___ %s ___ (%s)\n"%(mirror,mode)) if mirror=="M1": if mode=="global tip-tilt": D = np.zeros((wfs.get_measurement_size(),2)) D[:,0] = pushpull( lambda x : self.M1.global_tiptilt(x,0) ) D[:,1] = pushpull( lambda x : self.M1.global_tiptilt(0,x) ) if mode=="Txyz": D = np.zeros((wfs.get_measurement_size(),3*7)) idx = 0 Tx = lambda x : self.M1.update(origin=[x,0,0],euler_angles=[0,0,0],idx=kSeg) Ty = lambda x : self.M1.update(origin=[0,x,0],euler_angles=[0,0,0],idx=kSeg) Tz = lambda x : self.M1.update(origin=[0,0,x],euler_angles=[0,0,0],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,8): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Tx ) idx += 1 D[:,idx] = pushpull( Ty ) idx += 1 #if kSeg<7: D[:,idx] = pushpull( Tz ) idx += 1 sys.stdout.write("\n") if mode=="Rxyz": D = np.zeros((wfs.get_measurement_size(),3*7-1)) idx = 0 Rx = lambda x : self.M1.update(origin=[0,0,0],euler_angles=[x,0,0],idx=kSeg) Ry = lambda x : self.M1.update(origin=[0,0,0],euler_angles=[0,x,0],idx=kSeg) Rz = lambda x : self.M1.update(origin=[0,0,0],euler_angles=[0,0,x],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,8): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Rx ) idx += 1 D[:,idx] = pushpull( Ry ) idx += 1 if kSeg<7: D[:,idx] = pushpull( Rz ) idx += 1 sys.stdout.write("\n") if mode=="Rz": D = np.zeros((wfs.get_measurement_size(),7)) idx = 0 Rz = lambda x : self.M1.update(origin=[0,0,0],euler_angles=[0,0,x],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,8): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Rz ) idx += 1 sys.stdout.write("\n") if mode=="segment tip-tilt": D = np.zeros((wfs.get_measurement_size(),2*7)) idx = 0 Rx = lambda x : self.M1.update(origin=[0,0,0],euler_angles=[x,0,0],idx=kSeg) Ry = lambda x : self.M1.update(origin=[0,0,0],euler_angles=[0,x,0],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,8): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Rx ) idx += 1 D[:,idx] = pushpull( Ry ) idx += 1 sys.stdout.write("\n") if mode=="zernike": n_mode = self.M1.zernike.n_mode D = np.zeros((wfs.get_measurement_size(),(n_mode-first_mode)*7)) idx = 0 for kSeg in range(7): sys.stdout.write("Segment #%d: "%kSeg) for kMode in range(first_mode, n_mode): sys.stdout.write("%d "%(kMode+1)) D[:,idx] = np.ravel( pushpull( M1_zernike_update ) ) idx += 1 sys.stdout.write("\n") if mode=="bending modes": n_mode = self.M1.modes.n_mode D = np.zeros((wfs.get_measurement_size(),n_mode*7)) idx = 0 for kSeg in range(7): sys.stdout.write("Segment #%d: "%kSeg) for kMode in range(n_mode): sys.stdout.write("%d "%(kMode+1)) D[:,idx] = np.ravel( pushpull( M1_zernike_update ) ) idx += 1 sys.stdout.write("\n") if mode=="segment piston": n_mode = 6 D = np.zeros((wfs.get_measurement_size(),n_mode)) idx = 0 Tz = lambda x : self.M1.update(origin=[0,0,x],euler_angles=[0,0,0],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,7): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Tz ) idx += 1 #if segment=="full": # D = D[0:6,:] sys.stdout.write("\n") if mirror=="M2": if mode=="global tip-tilt": D = np.zeros((wfs.get_measurement_size(),2)) D[:,0] = pushpull( lambda x : self.M2.global_tiptilt(x,0) ) D[:,1] = pushpull( lambda x : self.M2.global_tiptilt(0,x) ) if mode=="pointing neutral": D = np.zeros((wfs.get_measurement_size(),2)) D[:,0] = pushpull( lambda x : self.M2.pointing_neutral(x,0) ) D[:,1] = pushpull( lambda x : self.M2.pointing_neutral(0,x) ) if mode=="coma neutral": D = np.zeros((wfs.get_measurement_size(),2)) D[:,0] = pushpull( lambda x : self.M2.coma_neutral(x,0) ) D[:,1] = pushpull( lambda x : self.M2.coma_neutral(0,x) ) if mode=="Txyz": D = np.zeros((wfs.get_measurement_size(),3*7)) idx = 0 Tx = lambda x : self.M2.update(origin=[x,0,0],euler_angles=[0,0,0],idx=kSeg) Ty = lambda x : self.M2.update(origin=[0,x,0],euler_angles=[0,0,0],idx=kSeg) Tz = lambda x : self.M2.update(origin=[0,0,x],euler_angles=[0,0,0],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,8): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Tx ) idx += 1 D[:,idx] = pushpull( Ty ) idx += 1 D[:,idx] = pushpull( Tz ) idx += 1 sys.stdout.write("\n") if mode=="Rxyz": D = np.zeros((wfs.get_measurement_size(),3*7-1)) idx = 0 Rx = lambda x : self.M2.update(origin=[0,0,0],euler_angles=[x,0,0],idx=kSeg) Ry = lambda x : self.M2.update(origin=[0,0,0],euler_angles=[0,x,0],idx=kSeg) Rz = lambda x : self.M2.update(origin=[0,0,0],euler_angles=[0,0,x],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,8): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Rx ) idx += 1 D[:,idx] = pushpull( Ry ) idx += 1 if kSeg<7: D[:,idx] = pushpull( Rz ) idx += 1 sys.stdout.write("\n") if mode=="Rz": D = np.zeros((wfs.get_measurement_size(),7)) idx = 0 Rz = lambda x : self.M2.update(origin=[0,0,0],euler_angles=[0,0,x],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,8): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Rz ) idx += 1 sys.stdout.write("\n") if mode=="segment tip-tilt": D = np.zeros((wfs.get_measurement_size(),2*7)) idx = 0 Rx = lambda x : self.M2.update(origin=[0,0,0],euler_angles=[x,0,0],idx=kSeg) Ry = lambda x : self.M2.update(origin=[0,0,0],euler_angles=[0,x,0],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,8): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Rx ) idx += 1 D[:,idx] = pushpull( Ry ) idx += 1 sys.stdout.write("\n") if mode=="zernike": n_mode = self.M2.zernike.n_mode D = np.zeros((wfs.get_measurement_size(),(n_mode-first_mode)*7)) idx = 0 for kSeg in range(7): sys.stdout.write("Segment #%d: "%kSeg) for kMode in range(first_mode,n_mode): sys.stdout.write("%d "%(kMode+1)) D[:,idx] = np.ravel( pushpull( M2_zernike_update ) ) idx += 1 sys.stdout.write("\n") if mode=="Karhunen-Loeve": n_mode = self.M2.modes.n_mode D = np.zeros((wfs.get_measurement_size(),(n_mode-first_mode)*7)) idx = 0; for kSeg in range(7): sys.stdout.write("Segment #%d: "%kSeg) for kMode in range(first_mode,n_mode): sys.stdout.write("%d "%(kMode+1)) D[:,idx] = np.ravel( pushpull( M2_zernike_update ) ) idx += 1 sys.stdout.write("\n") if mode=="TT7 segment tip-tilt": D = np.zeros((14,14)) idx = 0 Rx = lambda x : self.M2.update(origin=[0,0,0],euler_angles=[x,0,0],idx=kSeg) Ry = lambda x : self.M2.update(origin=[0,0,0],euler_angles=[0,x,0],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,8): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Rx ) idx += 1 D[:,idx] = pushpull( Ry ) idx += 1 sys.stdout.write("\n") if mode=="segment piston": n_mode = 7 D = np.zeros((wfs.get_measurement_size(),n_mode)) idx = 0 Tz = lambda x : self.M2.update(origin=[0,0,x],euler_angles=[0,0,0],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,8): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Tz ) idx += 1 sys.stdout.write("\n") if mode=="geometric segment tip-tilt": n_meas = 14 D = np.zeros((n_meas*gs.N_SRC,14)) idx = 0 Rx = lambda x : self.M2.update(origin=[0,0,0],euler_angles=[x,0,0],idx=kSeg) Ry = lambda x : self.M2.update(origin=[0,0,0],euler_angles=[0,x,0],idx=kSeg) sys.stdout.write("Segment #:") for kSeg in range(1,8): sys.stdout.write("%d "%kSeg) D[:,idx] = pushpull( Rx ) idx += 1 D[:,idx] = pushpull( Ry ) idx += 1 sys.stdout.write("\n") if mirror=="MOUNT": if mode=="pointing": D = np.zeros((wfs.get_measurement_size(),2)) def depoint(r,o): self.pointing_error_zenith = r self.pointing_error_azimuth = o D[:,0] = pushpull( lambda x : depoint(x,0.0 ) ) D[:,1] = pushpull( lambda x : depoint(x,np.pi*0.5 ) ) depoint(0.0,0.0) sys.stdout.write("------------\n") #self[mirror].D.update({mode:D}) if calibrationVaultKwargs is None: return D else: return CalibrationVault([D],**calibrationVaultKwargs)
[docs] def PSSn(self,src,r0=16e-2,L0=25.0,zenith_distance=30, C=None,AW0=None,ref_src=None,save=False, amplitude_filter=None): """ Computes the PSSn corresponding to the current state of the telescope Parameters ---------- src : Source The source object that is propagated through the telescope, the PSSn is given at the wavelenght of the source r0 : float, optional The Fried parameter at zenith and at 500nm in meter; default: 0.15m L0 : float, optional The outer scale in meter; default: 25m zenith_distance : float, optional The angular distance of the source from zenith in degree; default: 30 degree C : ndarray, optional The atmosphere OTF; default: None AW0 : ndarray, optional The collimated telescope OTF; default: None ref_src : Source The source from which AW0 is computed save : boolean, optional If True, return in addition to the PSSn, a dictionnary with C and AW0; default: False Return ------ PSSn : float The PSSn value """ if C is None: _r0_ = r0*(src.wavelength/0.5e-6)**1.2 _r0_ = (_r0_**(-5./3.)/np.cos(zenith_distance*np.pi/180))**(-3./5.) nPx = src.rays.N_L D = src.rays.L #u = np.arange(2*nPx-1,dtype=np.float)*D/(nPx-1) #u = u-u[-1]/2 u = np.fft.fftshift(np.fft.fftfreq(2*nPx-1))*(2*nPx-1)*D/(nPx-1) x,y = np.meshgrid(u,u) rho = np.hypot(x,y) C = phaseStats.atmOTF(rho,_r0_,L0) if AW0 is None: if ref_src is None: _src_ = Source(src.band.decode(), rays_box_size=src.rays.L, rays_box_sampling=src.rays.N_L, rays_origin=[0,0,25]) state = self.state pez = self.pointing_error_zenith pea = self.pointing_error_azimuth self.reset() self.propagate(_src_) self^=state self.pointing_error_zenith = pez self.pointing_error_azimuth = pea else: _src_ = ref_src A = _src_.amplitude.host() if amplitude_filter is not None: A *= amplitude_filter F = _src_.phase.host() k = 2.*np.pi/_src_.wavelength W = A*np.exp(1j*k*F) S1 = np.fliplr(np.flipud(W)) S2 = np.conj(W) AW0 = fftconvolve(S1,S2) #src.reset() #self.propagate(src) A_ = np.dstack(np.vsplit(src.amplitude.host(),src.N_SRC)) F_ = np.dstack(np.vsplit(src.phase.host(),src.N_SRC)) k = 2.*np.pi/src.wavelength out = np.zeros(src.N_SRC) for k_SRC in range(src.N_SRC): A = A_[:,:,k_SRC] if amplitude_filter is not None: A *= amplitude_filter F = F_[:,:,k_SRC] W = A*np.exp(1j*k*F) S1 = np.fliplr(np.flipud(W)) S2 = np.conj(W) AW = fftconvolve(S1,S2) out[k_SRC] = np.sum(np.abs(AW*C)**2)/np.sum(np.abs(AW0*C)**2) if save: return (out,{'C':C,'AW0':AW0}) else: return out
## AGWS_CALIBRATE def AGWS_calibrate(self,wfs,gs,stroke=None,coupled=False,decoupled=False, withM1=True,withM2=True, fluxThreshold=0.0, filterMirrorRotation=True, includeBM=True, includeMount=False, calibrationVaultKwargs={'nThreshold':None,'insertZeros': None}): gs.reset() self.reset() self.propagate(gs) if stroke is None: stroke = [1e-6]*5 if coupled: wfs.calibrate(gs,fluxThreshold) flux = wfs.valid_lenslet.f.host() D = [] if withM1: D.append( self.calibrate(wfs,gs,mirror='M1',mode='Rxyz',stroke=stroke[0]) ) D.append( self.calibrate(wfs,gs,mirror='M1',mode='Txyz',stroke=stroke[2]) ) if withM2: D.append( self.calibrate(wfs,gs,mirror='M2',mode='Rxyz',stroke=stroke[1]) ) D.append( self.calibrate(wfs,gs,mirror='M2',mode='Txyz',stroke=stroke[3]) ) if includeBM: D.append( self.calibrate(wfs,gs,mirror='M1',mode='bending modes',stroke=stroke[4]) ) if includeMount: D.append( self.calibrate(gwfs,gs,mirror='MOUNT',mode='pointing',stroke=ceo.constants.ARCSEC2RAD) ) D = np.concatenate(D,axis=1) return CalibrationVault([D],**calibrationVaultKwargs) elif decoupled: wfs.calibrate(gs,0.0) flux = wfs.valid_lenslet.f.host() D = [] if withM1: D.append( self.calibrate(wfs,gs,mirror='M1',mode='Rxyz',stroke=stroke[0]) ) if withM2: D.append( self.calibrate(wfs,gs,mirror='M2',mode='Rxyz',stroke=stroke[1]) ) if withM1: D.append( self.calibrate(wfs,gs,mirror='M1',mode='Txyz',stroke=stroke[2]) ) if withM2: D.append( self.calibrate(wfs,gs,mirror='M2',mode='Txyz',stroke=stroke[3]) ) if includeBM: D.append( self.calibrate(wfs,gs,mirror='M1',mode='bending modes',stroke=stroke[4]) ) if not withM1 and withM2: D_s = [ np.concatenate([D[0][:,k*3:k*3+3], D[1][:,k*3:k*3+3], D[2][:,k*self.M1.modes.n_mode:(k+1)*self.M1.modes.n_mode]],axis=1) for k in range(7)] elif not withM2 and withM1: D_s = [ np.concatenate([D[0][:,k*3:k*3+3], D[1][:,k*3:k*3+3], D[2][:,k*self.M1.modes.n_mode:(k+1)*self.M1.modes.n_mode]],axis=1) for k in range(7)] elif withM1 and withM2: D_s = [ np.concatenate([D[0][:,k*3:k*3+3], D[2][:,k*3:k*3+3], D[1][:,k*3:k*3+3], D[3][:,k*3:k*3+3], D[4][:,k*self.M1.modes.n_mode:(k+1)*self.M1.modes.n_mode]],axis=1) for k in range(7)] else: D_s = [ np.concatenate([D[0][:,k*self.M1.modes.n_mode:(k+1)*self.M1.modes.n_mode]],axis=1) for k in range(7)] else: if not withM1: D_s = [ np.concatenate([D[0][:,k*3:k*3+3], D[1][:,k*3:k*3+3]],axis=1) for k in range(7)] elif not withM2: D_s = [ np.concatenate([D[0][:,k*3:k*3+3], D[1][:,k*3:k*3+3]],axis=1) for k in range(7)] else: D_s = [ np.concatenate([D[0][:,k*3:k*3+3], D[2][:,k*3:k*3+3], D[1][:,k*3:k*3+3], D[3][:,k*3:k*3+3]],axis=1) for k in range(7)] max_flux = flux.max() flux_filter = flux>fluxThreshold*max_flux flux_filter2 = np.tile(flux_filter,(2,1)) Qxy = [ np.reshape( np.sum(np.abs(D_s[k])>1e-2*np.max(np.abs(D_s[k])),axis=1)!=0 ,flux_filter2.shape ) for k in range(7) ] Q = [ np.logical_and(X,flux_filter2) for X in Qxy ] Q3 = np.dstack(Q).reshape(flux_filter2.shape + (7,)) Q3clps = np.sum(Q3,axis=2) Q3clps = Q3clps>1 VLs = [ np.logical_and(X,~Q3clps) for X in Q] D_sr = [ D_s[k][VLs[k].ravel(),:] for k in range(7) ] if filterMirrorRotation: for k in range(6): U,s,VT = LA.svd(D_sr[k][:,:6],full_matrices=False) U[:,-1] = 0 s[-1] = 0 D_sr[k][:,:6] = np.dot(U,np.dot(np.diag(s),VT)) U,s,VT = LA.svd(D_sr[k][:,6:12],full_matrices=False) U[:,-1] = 0 s[-1] = 0 D_sr[k][:,6:12] = np.dot(U,np.dot(np.diag(s),VT)) return CalibrationVault(D_sr, valid=VLs,**calibrationVaultKwargs) else: raise ValueError('"coupled" or "decoupled" must be set to True!') def cloop_calib_init(self, D, nPx, onaxis_wfs_nLenslet=60, sh_thr=0.2, AOtype=None, svd_thr=1e-9, RECdir='./'): assert AOtype == 'NGAOish' or AOtype == 'LTAOish', "AOtype should be either 'NGAOish', or 'LTAOish'" self.AOtype = AOtype #----> ON-AXIS AO SH WFS: d = D/onaxis_wfs_nLenslet self.cl_wfs = GeometricShackHartmann(onaxis_wfs_nLenslet, d, N_GS=1) self.cl_gs = Source("R",zenith=0.,azimuth=0., rays_box_size=D, rays_box_sampling=nPx, rays_origin=[0.0,0.0,25]) # Calibrate SH (valid SAs, slope null vector) self.cl_gs.reset() self.reset() self.propagate(self.cl_gs) self.cl_wfs.calibrate(self.cl_gs,sh_thr) #----> ON-AXIS SEGMENT PISTON SENSOR: if AOtype=='NGAOish': self.onps = IdealSegmentPistonSensor(self.cl_gs, D, nPx, segment='full') self.cl_gs.reset() self.reset() self.propagate(self.cl_gs) self.onps_signal_ref = self.onps.piston(self.cl_gs).ravel() #[0:6] # reference signal #-----> ON-AXIS AO SYSTEM INTERACTION MATRIX CALIBRATIONS # 1. SH - M2 Zernike modes # 2. SH - M2 SPP # 3. SPS - M2 Zernike modes (Zero matrix) # 4. SPS - M2 SPP print("Calibrating on-axis "+AOtype+" AO system for closed-loop IntMat calibration") print("--------------------------------------------------------------------------") print("\n--> on-axis SH:") # 1. SH - M2 segment Zernikes IM fname = 'IM_SHgeom'+\ '_'+self.M2.mirror_modes_type+'_nmode'+str(self.M2.modes.n_mode)+'_SHthr%1.1f.npz'%sh_thr fnameFull = os.path.normpath(os.path.join(RECdir,fname)) Zstroke = 20e-9 #m rms z_first_mode = 1 # to skip piston if os.path.isfile(fnameFull) == False: D_M2_Z = self.calibrate(self.cl_wfs, self.cl_gs, mirror="M2", mode=self.M2.mirror_modes_type, stroke=Zstroke, first_mode=z_first_mode) else: print('Reading file: '+fnameFull) ftemp = np.load(fnameFull) D_M2_Z = ftemp.f.D_M2 ftemp.close() nzernall = (D_M2_Z.shape)[1] ## number of zernike DoFs calibrated #n_zern = self.M2.modes.n_mode # Identify subapertures belonging to two adjacent segments (leading to control leakage) #QQ = D_M2_Z == 0 #LL = np.sum(QQ, axis=1) #LI = np.where( LL[0:self.cl_wfs.n_valid_lenslet] < (n_zern-1)*6) #print (" A total of %d leaking SH WFS SAs identified.\n"%(LI[0].shape)) #vlens = self.cl_wfs.valid_lenslet.f.host().ravel()>0 #idx = np.where( vlens == 1) #vlens[idx[0][LI]] = 0 #leak_slopes_idx = np.array([LI[0], LI[0]+self.cl_wfs.n_valid_lenslet]).ravel() #D_M2_Z[leak_slopes_idx,:] = 0 if AOtype=='NGAOish': # 2. SH - M2 segment piston IM PSstroke = 200e-9 #m D_M2_PS_sh = self.calibrate(self.cl_wfs, self.cl_gs, mirror="M2", mode="segment piston", stroke=PSstroke) #D_M2_PS_sh[leak_slopes_idx,:] = 0 print("\n--> on-axis SPS:") # 3. Ideal SPS - M2 segment Zernikes IM D_M2_Z_PSideal = np.zeros((7,nzernall)) #Zstroke = 20e-9 #m rms #z_first_mode = 1 # to skip some low-order modes #D_M2_Z_PSideal = self.calibrate(self.onps, self.cl_gs, mirror="M2", mode=self.M2.mirror_modes_type, stroke=Zstroke, first_mode=z_first_mode) print('AO SPS - M2 Segment Zernike IM:') print(D_M2_Z_PSideal.shape) # 4. Ideal SPS - M2 segment piston IM PSstroke = 50e-9 #m D_M2_PSideal = self.calibrate(self.onps, self.cl_gs, mirror="M2", mode="segment piston", stroke=PSstroke) #----> Create super-merger IM for "simplified NGAO control" # DoFs: segment Zernikes (Z2->Zx), segment Piston # Sensors: on-axis SH WFS, on-axis idealized Piston Sensor D_AO_SH = np.concatenate((D_M2_Z, D_M2_PS_sh), axis=1) D_AO_PS = np.concatenate((D_M2_Z_PSideal, D_M2_PSideal), axis=1) D_AO = np.concatenate((D_AO_SH, D_AO_PS), axis=0) elif AOtype=='LTAOish': D_AO = D_M2_Z print('\nOn-axis AO merged IM condition number: %f'%np.linalg.cond(D_AO)) self.R_AO = np.linalg.pinv(D_AO, rcond=svd_thr)
#return [D_M2_Z,D_M2_PS_sh, D_M2_Z_PSideal, D_M2_PSideal] ### PSSN class PSSn(object): def __init__(self,r0=16e-2,L0=25.0,zenith_distance=30): self.r0 = r0 self.r0_wavelength = 0.5e-6 self.L0 = L0 self.zenith_distance = zenith_distance self.C = None self.AW0 = None self.AW = None self.N = 0 def __call__(self, gmt, src, sigma=0, full_opd=False, reset=True): """ Computes the PSSn corresponding to the current state of the telescope Parameters ---------- src : Source The source object that is propagated through the telescope, the PSSn is given at the wavelenght of the source r0 : float, optional The Fried parameter at zenith and at 500nm in meter; default: 0.15m L0 : float, optional The outer scale in meter; default: 25m zenith_distance : float, optional The angular distance of the source from zenith in degree; default: 30 degree C : ndarray, optional The atmosphere OTF; default: None AW0 : ndarray, optional The collimated telescope OTF; default: None save : boolean, optional If True, return in addition to the PSSn, a dictionnary with C and AW0; default: False Return ------ PSSn : float The PSSn value """ if self.C is None: _r0_ = self.r0*(src.wavelength/self.r0_wavelength)**1.2 _r0_ = (_r0_**(-5./3.)/np.cos(self.zenith_distance*np.pi/180))**(-3./5.) nPx = src.rays.N_L D = src.rays.L #u = np.arange(2*nPx-1,dtype=np.float)*D/(nPx-1) #u = u-u[-1]/2 u = np.fft.fftshift(np.fft.fftfreq(2*nPx-1))*(2*nPx-1)*D/(nPx-1) x,y = np.meshgrid(u,u) rho = np.hypot(x,y) self.C = phaseStats.atmOTF(rho,_r0_,self.L0) if self.AW0 is None: _src_ = Source(src.band.decode(), rays_box_size=src.rays.L, rays_box_sampling=src.rays.N_L, rays_origin=[0,0,25]) state = gmt.state pez = gmt.pointing_error_zenith pea = gmt.pointing_error_azimuth gmt.reset() gmt.propagate(_src_) A = _src_.amplitude.host() F = _src_.phase.host() k = 2.*np.pi/_src_.wavelength W = A*np.exp(1j*k*F) S1 = np.fliplr(np.flipud(W)) S2 = np.conj(W) self.AW0 = fftconvolve(S1,S2) gmt^=state gmt.pointing_error_zenith = pez gmt.pointing_error_azimuth = pea if self.N==0: self.OTF(src) if sigma>0: nPx = src.rays.N_L D = src.rays.L u = np.fft.fftshift(np.fft.fftfreq(2*nPx-1))*(2*nPx-1)*D/(nPx-1) x,y = np.meshgrid(u,u) K = np.exp(-2*(sigma*np.pi/src.wavelength)**2*(x**2+y**2)) out = np.sum(np.abs(self.AW*K*self.C/self.N)**2)/np.sum(np.abs(self.AW0*self.C)**2) else: if full_opd: out = np.sum(np.abs(self.AW)**2)/np.sum(np.abs(self.AW0*self.C)**2) else: out = np.sum(np.abs(self.AW*self.C)**2)/np.sum(np.abs(self.AW0*self.C)**2) if reset: self.AW *= 0 self.N = 0 return out def OTF(self,src): #+src if isinstance(src,Source): A = src.amplitude.host() F = src.phase.host() src_wavelength = src.wavelength if isinstance(src,tuple): A = src[0] F = src[1] src_wavelength = src[2] k = 2.*np.pi/src_wavelength W = A*np.exp(1j*k*F) S1 = np.fliplr(np.flipud(W)) S2 = np.conj(W) if self.AW is None: self.AW = np.zeros_like(self.AW0) _AW_ = fftconvolve(S1,S2) self.N += 1 a = (self.N-1)/self.N b = 1/self.N self.AW = a*self.AW + b*_AW_ def OTF_integrate(self,src,processes=1): from joblib import Parallel, delayed A = src[0] F = src[1] src_wavelength = src[2] N_F = len(F) n_job = int(N_F/processes) a = [k for k in range(0,processes*n_job,n_job)] b = [k for k in range(n_job,processes*(n_job+1),n_job)] b[-1] = N_F Fsplit = [F[x:y] for x,y in zip(a,b)] params = zip(Fsplit,a,b) out = Parallel(n_jobs=processes)(delayed(integrate)(x) for x in params) self.AW = np.dstack(out).sum(2) self.N += N_F def integrate(params): from scipy.signal import fftconvolve _F_,a,b = params _AW_=0 for l in range(a,b): k = 2.*np.pi/src_wavelength W = np.exp(1j*k*_F_[l-a]) S1 = np.fliplr(np.flipud(W)) S2 = np.conj(W) _AW_ += fftconvolve(S1,S2) return _AW_ # JGMT_MX from .utilities import JSONAbstract class JGMT_MX(JSONAbstract,GMT_MX): def __init__(self, jprms = None, jsonfile = None): print("@(ceo.JGMT_MX)>") JSONAbstract.__init__(self,jprms=jprms, jsonfile=jsonfile) GMT_MX.__init__(self,self.jprms["pupil size"], self.jprms["pupil sampling"], M1_radial_order=self.jprms["M1"]["Zernike radial order"], M2_radial_order=self.jprms["M2"]["Zernike radial order"]) class SHTT7(ShackHartmann): def __init__(self, N_SIDE_LENSLET, N_PX_LENSLET, d, DFT_osf=2, N_PX_IMAGE=None, BIN_IMAGE=1, N_GS=1): ShackHartmann.__init__(self,N_SIDE_LENSLET, N_PX_LENSLET, d, DFT_osf=DFT_osf, N_PX_IMAGE=N_PX_IMAGE, BIN_IMAGE=BIN_IMAGE, N_GS=N_GS) def calibrate(self, gs, gmt, stroke_pixel=2, slopes_threshold=0.95): gs.reset() gmt.reset() ShackHartmann.reset(self) gmt.propagate(gs) flux_threshold = 0.95 ShackHartmann.calibrate(self, gs, flux_threshold) nvl = self.n_valid_lenslet self.M = np.zeros((nvl,7)) mas2rad = 1e-3*math.pi/180/3600 px_scale = self.pixel_scale print("TT7 pixel scale: %.2fmas"%(px_scale/mas2rad)) stroke = stroke_pixel*px_scale slopes_cut = stroke_pixel**2*2*slopes_threshold for k in range(1,8): gs.reset() gmt.reset() ShackHartmann.reset(self) gmt.M1.update(euler_angles=[stroke,stroke,0.0],idx=k) gmt.propagate(gs) ShackHartmann.analyze(self,gs) c = self.valid_slopes.host().flatten()/px_scale rho2_c = c[:nvl]**2 + c[nvl:]**2 slopes_cut = rho2_c.max()*slopes_threshold self.M[:,k-1] = np.array(rho2_c>slopes_cut,dtype=np.float) gs.reset() gmt.reset() ShackHartmann.reset(self) def analyze(self, gs): ShackHartmann.analyze(self,gs) nvl = self.n_valid_lenslet c = self.valid_slopes.host() w = np.sum(self.M,axis=0) self.c7 = np.concatenate((np.dot(c[0,:nvl],self.M)/w, np.dot(c[0,nvl:],self.M)/w)) from abc import ABCMeta, abstractmethod class Sensor: __metaclass__ = ABCMeta @abstractmethod def calibrate(self): pass @abstractmethod def reset(self): pass @abstractmethod def analyze(self): pass @abstractmethod def propagate(self): pass @abstractmethod def process(self): pass @abstractmethod def get_measurement(self): pass @abstractmethod def get_measurement_size(self): pass def __pos__(self): pass class GeometricTT7(Sensor): def __init__(self,**kwargs): self.n_valid_slopes = 14 self.reference_slopes = np.zeros((14,1)) def calibrate(self, src, threshold=None): data = src.segmentsWavefrontGradient() self.reference_slopes = data def reset(self): pass def analyze(self, src): data = src.segmentsWavefrontGradient() self.valid_slopes = data - self.reference_slopes def propagate(self, src): data = src.segmentsWavefrontGradient() self.valid_slopes = data - self.reference_slopes def process(self): pass def get_measurement(self): return self.valid_slopes.ravel() def get_measurement_size(self): return 14 @property def data(self): return self.valid_slopes.ravel() @property def Data(self): return self.valid_slopes.reshape(14,1)
[docs]class DispersedFringeSensor(SegmentPistonSensor): """ A class for the GMT Dispersed Fringe Sensor. This class inherits from the SegmentPistonSensor class. Parameters ---------- Same parameters as in SegmentPistonSensor class. Attributes ---------- INIT_ALL_ATTRIBUTES : bool ; Default: False If True, additional attributes (mainly for display and debugging) will be created. See list of Additional Attributes below. fftlet_rotation : float ; vector with 12xN_SRC elements The angle of the line joining the center of the three lobes of the fftlet image. Init by calibrate() method. lobe_detection : string ; default: 'gaussfit' Algorithm for lobe detection, either 'gaussfit' for 2D gaussian fit, or 'peak_value' for peak detection. spsmask : bool Data cube containing the masks (one for each fftlet) required to isolate the "detection blob", i.e. the upper-most lobe from which the measurement will be computed. Init by calibrate() method. measurement : float Dispersed Fringe Sensor output measurement vector; y-coordinate of the detection blob in the rotated reference frame (i.e. the reference frame having the x-axis passing through the three lobe peaks on a fftlet image, and the y-axis perpendicular to it. Units: pixels in the fftlet image plane. Attributes (Additional) ----------------------- blob_data : float fftlet peak detection data; blob_data is a matrix containing the (x,y,radius) of the three lobes on each fftlet image. Init by calibrate() method. pl_m, pl_b : float Slope and y-intercept of the line passing through the three lobe peaks on a fftlet image. Init by calibrate() method. pp_m, pp_b : float Slope and y-intercept of the perpendicular line to the line above, passing between the central and the "detection blob" in a ffltlet image. Init by calibrate() method. fftlet_fit_params : float Gaussian fit parameters of detection blobs (Amplitude normalized to central lobe peak, y, x, width_y, width_x, rotation). Init by process() method. fftlet_fit_images : float Data cube containing best-fit 2D gaussians of detection blobs. Init by process() method. measurement_ortho : float x-coordinate of the detection blob in the rotated reference frame (i.e. the reference frame having the x-axis passing through the three lobe peaks on a fftlet image, and the y-axis perpendicular to it. Init by process() method. See also -------- SegmentPistonSensor : the super class IdealSegmentPistonSensor : the class for an idealized segment piston sensor GMT_M1 : the class for GMT M1 model Source : a class for astronomical sources cuFloatArray : an interface class between GPU host and device data for floats """ def __init__(self, M1, src, dispersion=5.0, field_of_view=3.0,nyquist_factor=1.0): SegmentPistonSensor.__init__(self, M1, src, dispersion=dispersion, field_of_view=field_of_view, nyquist_factor=nyquist_factor) self._N_SRC = src.N_SRC self.INIT_ALL_ATTRIBUTES = False self.lobe_detection = 'gaussfit'
[docs] def init_detector_mask(self, mask_size): """ Defines the circular mask to be applied over each fringe image. Parameters ---------- mask_size: float Diameter of mask in arcseconds. """ mask_size_px = mask_size / (self.pixel_scale * constants.RAD2ARCSEC) print("Size of DFS detector mask [pix]: %d"%(np.round(mask_size_px)) ) N_PX_FRINGE_IMAGE = self.camera.N_PX_IMAGE / self.camera.BIN_IMAGE scale = mask_size_px / N_PX_FRINGE_IMAGE circ = Telescope(N_PX_FRINGE_IMAGE, 1, scale=scale) circ_m = circ.f.host(shape=(N_PX_FRINGE_IMAGE,N_PX_FRINGE_IMAGE)) big_circ_m = np.tile(np.tile(circ_m,self.camera.N_SIDE_LENSLET).T,self.camera.N_SIDE_LENSLET) gpu_big_circ_m = cuFloatArray(host_data=big_circ_m) self.fft_mask.alter(gpu_big_circ_m)
[docs] def gaussian_func(self, height, center_x, center_y, width_x, width_y, rotation): """ Returns a gaussian function G(x,y) to produce a 2D Gaussian with the given parameters Parameters ---------- height : float Amplitude of the Gaussian center_x : float x-coordinates of the Gaussian's center in pixels. center_y : float y-coordinates of the Gaussian's center in pixels. width_x : float standard deviation in the x-direction in pixels. width_y : float standard deviation in the y-direction in pixels. rotation : float angle of rotation of the Gaussian (x,y) axes in degrees. """ width_x = float(np.absolute(width_x)) width_y = float(np.absolute(width_y)) rotation = np.deg2rad(rotation) def rotgauss(x,y): xp = (x-center_x) * np.cos(rotation) - (y-center_y) * np.sin(rotation) + center_x yp = (x-center_x) * np.sin(rotation) + (y-center_y) * np.cos(rotation) + center_y g = height*np.exp( -(((center_x-xp)/width_x)**2+ ((center_y-yp)/width_y)**2)/2.) return g return rotgauss
[docs] def fitgaussian(self, data): """ Fits a 2D Gaussian to the input data, and returns the Gaussian fit parameters: (amplidute, x, y, width_x, width_y, rotation) Parameters ---------- data : numpy 2D ndarray The array containing the image (i.e. the detection blob) to be fitted with a 2D Gaussian """ def moments(): total = data.sum() X, Y = np.indices(data.shape) x = (X*data).sum()/total y = (Y*data).sum()/total col = data[:, int(y)] width_x = np.sqrt(abs((np.arange(col.size)-y)**2*col).sum()/col.sum()) row = data[int(x), :] width_y = np.sqrt(abs((np.arange(row.size)-x)**2*row).sum()/row.sum()) height = data.max() return height, x, y, width_x, width_y, 0.0 params = moments() errorfunction = lambda p: np.ravel(self.gaussian_func(*p)(*np.indices(data.shape)) - data) p, success = leastsq(errorfunction, params) return p
[docs] def get_data_cube(self, data_type='fftlet'): """ Returns the DFS data (either fringe or fftlet images) in cube format Parameters ---------- data_type : string Set to "camera" to return fringes; set to "fftlet" to return fftlet images; default: fftlet """ assert data_type == 'fftlet' or data_type == 'camera' or data_type == 'pupil_masks', "data_type should be either 'fftlet', 'camera', or 'pupil_masks'" n_lenslet = self.camera.N_SIDE_LENSLET if data_type == 'fftlet': data = self.fftlet.host() n_px = self.camera.N_PX_IMAGE elif data_type == 'camera': data = self.camera.frame.host() n_px = self.camera.N_PX_IMAGE/2 elif data_type == 'pupil_masks': data = self.W.amplitude.host() n_px = (data.shape)[0] / n_lenslet dataCube = np.zeros((n_px, n_px, self._N_SRC*12)) k = 0 for j in range(n_lenslet): for i in range(n_lenslet): dataCube[:,:,k] = data[i*n_px:(i+1)*n_px, j*n_px:(j+1)*n_px] k += 1 if k == self._N_SRC*12: break if k == self._N_SRC*12: break return dataCube
[docs] def calibrate(self, src, gmt): """ Calibrates the lobe detection masks (spsmask). Parameters ---------- src : Source The Source object used for piston sensing gmt : GMT_MX The GMT object """ gmt.reset() src.reset() self.reset() gmt.propagate(src) self.propagate(src) self.fft() dataCube = self.get_data_cube(data_type='fftlet') ### Essential data self.fftlet_rotation = np.zeros(src.N_SRC*12) self.spsmask = np.zeros((self.camera.N_PX_IMAGE,self.camera.N_PX_IMAGE,src.N_SRC*12), dtype='bool') ### Additional data for visualization and debugging if self.INIT_ALL_ATTRIBUTES == True: self.blob_data = np.zeros((src.N_SRC*12, 3, 3)) self.pl_m = np.zeros((src.N_SRC*12)) self.pl_b = np.zeros((src.N_SRC*12)) self.pp_m = np.zeros((src.N_SRC*12)) self.pp_b = np.zeros((src.N_SRC*12)) for k in range(src.N_SRC*12): ### Find center coordinates of three lobes (i.e. central and two lateral ones) on each imagelet. blob_data = blob_log(dataCube[:,:,k], min_sigma=5, max_sigma=10, overlap=1, threshold=0.005*np.max(dataCube[:,:,k])) assert blob_data.shape == (3,3), "lobe detection failed" blob_data = blob_data[np.argsort(blob_data[:,0])] #order data in asceding y-coord ### The code below does the following: ### 1) Fit a line passing through the centers of the three lobes (aka pl line). ### y = pl_m * x + pl_b ### 2) Find the perpendicular to the pl line (aka pp line) passing through a point lying between ### the central and uppermost lobe (aka BORDER POINT). ### y = pp_m * x + pp_b ### BORDER POINT coordinates (pp_x, pp,y) ### separation tweaking: 0.5 will select BORDER POINT equidistant to the two lobes. separation_tweaking = 0.6 pp_py, pp_px = blob_data[1,0:2] + separation_tweaking*(blob_data[2,0:2] - blob_data[1,0:2]) if np.all(blob_data[:,1] == blob_data[0,1]): # pl line is VERTICAL pp_m = 0. self.fftlet_rotation[k] = 0. pl_m = float('inf') else: pl_m, pl_b = np.polyfit(blob_data[:,1], blob_data[:,0], 1) # pl line fitting pp_m = -1.0 / pl_m fftlet_rotation = np.arctan(pl_m) ### We know that the rotation angles are [-90, -30, 30, 90]. apriori_angles = np.array([-90,-30,30,90]) fftlet_rotation = (math.pi/180)*min(apriori_angles, key=lambda aa:abs(aa-fftlet_rotation*180/math.pi)) self.fftlet_rotation[k] = fftlet_rotation pp_m = -1.0/ np.tan(fftlet_rotation) pp_b = pp_py - pp_m * pp_px ### Define the SPS masks as the region y > pp line u = np.arange(self.camera.N_PX_IMAGE) v = np.arange(self.camera.N_PX_IMAGE) xx,yy = np.meshgrid(u,v) self.spsmask[:,:,k] = yy > xx*pp_m+pp_b if self.INIT_ALL_ATTRIBUTES == True: self.blob_data[k,:,:] = blob_data self.pl_m[k] = pl_m self.pl_b[k] = pl_b self.pp_m[k] = pp_m self.pp_b[k] = pp_b
[docs] def reset(self): """ Resets both the SPS detector frame and the fftlet buffer to zero. """ self.camera.reset() self.fftlet.reset()
[docs] def process(self): """ Processes the Dispersed Fringe Sensor detector frame """ dataCube = self.get_data_cube(data_type='fftlet') self.measurement = np.zeros(self._N_SRC*12) if self.INIT_ALL_ATTRIBUTES == True: self.fftlet_fit_params = np.zeros((6,self._N_SRC*12)) self.measurement_ortho = np.zeros(self._N_SRC*12) if self.lobe_detection == 'gaussfit': self.fftlet_fit_images = np.zeros((self.camera.N_PX_IMAGE,self.camera.N_PX_IMAGE,self._N_SRC*12)) for k in range(self._N_SRC*12): mylobe = dataCube[:,:,k] * self.spsmask[:,:,k] centralpeak = np.max(dataCube[:,:,k]) if self.lobe_detection == 'gaussfit': params = self.fitgaussian(mylobe) (height, y, x, width_y, width_x, rot) = params elif self.lobe_detection == 'peak_value': mylobe = rotate(mylobe,self.fftlet_rotation[k]*180/np.pi, reshape=False) height = np.max(mylobe) height_pos = np.argmax(mylobe) y, x = np.unravel_index(height_pos, mylobe.shape) if y < (mylobe.shape[0]-1) and x < (mylobe.shape[1]-1): dx = 0.5*(mylobe[y,x-1] - mylobe[y,x+1]) / (mylobe[y,x-1]+mylobe[y,x+1]-2*height+1e-6) dy = 0.5*(mylobe[y-1,x] - mylobe[y+1,x]) / (mylobe[y-1,x]+mylobe[y+1,x]-2*height+1e-6) x += dx y += dy width_x, width_y, rot = 0,0,0 #x1 = x * np.cos(-self.fftlet_rotation[k]) - y * np.sin(-self.fftlet_rotation[k]) #y1 = x * np.sin(-self.fftlet_rotation[k]) + y * np.cos(-self.fftlet_rotation[k]) y1 = y x1 = x self.measurement[k] = y1 if self.INIT_ALL_ATTRIBUTES == True: self.measurement_ortho[k] = x1 self.fftlet_fit_params[:,k] = (height / centralpeak, y, x, width_y, width_x, rot) if self.lobe_detection == 'gaussfit': fftlet_shape = (self.camera.N_PX_IMAGE,self.camera.N_PX_IMAGE) self.fftlet_fit_images[:,:,k] = self.gaussian_func(*params)(*np.indices(fftlet_shape))
[docs] def analyze(self, src): """ Propagates the guide star to the SPS detector (noiseless) and processes the frame Parameters ---------- src : Source The piston sensing guide star object """ self.reset() self.propagate(src) self.fft() self.process()
[docs] def piston(self, src): """ Return M1 differential piston. This method was created to provide compatibility with the IdealSegmentPistonSensor Piston method. Parameters ---------- src : Source The piston sensing guide star object Return ------ p : numpy ndarray A 12 element differential piston vector """ self.analyze(src) p = self.measurement.reshape(-1,12) return p
[docs] def get_measurement(self): """ Returns the measurement vector """ return self.measurement.ravel()
[docs] def get_measurement_size(self): """ Returns the size of the measurement vector """ return self._N_SRC*12
[docs]class IdealSegmentPistonSensor: """ A class for the GMT segment piston sensor Parameters ---------- src : Source The Source object used for piston sensing D : float Telescope diameter (m) nPx : integer Pupil linear sampling (pixels) W : float, optional The width of the lenslet; default: 1.5m L : float, optional The length of the lenslet; default: 1.5m segment : string "full" for piston on the entire segments or "edge" for the differential piston between segment. Attributes ---------- P : numpy ndarray M1 segment mask as a 7 columns array rc : float The radius of the circle where are centered the first 6 lenslets rp : float The radius of the circle where are centered the last 6 lenslets W : float The width of the lenslet L : float The length of the lenslet M : numpy ndarray The mask corresponding to the 12 lenslet array as a 12 columns array segment : string "full" for piston on the entire segments or "edge" for the differential piston between segment. See also -------- GMT_MX : a class for GMT M1 and M2 mirrors Source : a class for astronomical sources Examples -------- >>> import ceo >>> nPx = 256 >>> D = 25.5 >>> src = ceo.Source("R",rays_box_size=D,rays_box_sampling=nPx,rays_origin=[0.0,0.0,25]) >>> gmt = ceo.GMT_MX(D,nPx) >>> src.reset() >>> gmt.propagate(src) The piston per M1 segment is obtained with >>> SPS = ceo.IdealSegmentPistonSensor(src,D,nPx,segment='full') >>> SPS.piston(src) The 12 differential pistons are given by >>> SPS = ceo.IdealSegmentPistonSensor(src,D,nPx,segment='edge') >>> SPS.piston(src) """ def __init__(self, src, D, D_px, W=1.5, L=1.5, segment=None): assert segment=="full" or segment=="edge", "segment parameter is either ""full"" or ""edge""" self.segment = segment self._N_SRC = src.N_SRC def ROT(o): return np.array([ [ math.cos(o), math.sin(o)], [-math.sin(o),math.cos(o)] ]) n = D_px R = D/2 u = np.linspace(-1,1,n)*R x,y = np.meshgrid(u,u) xy = np.array( [ x.flatten(), y.flatten()] ) self.rc = 4.387 xy_rc = np.array([[0],[self.rc]]) #print xy_rc self.rp = 7.543 xy_rp = np.array([[self.rp],[0]]) #print xy_rp self.W = W self.L = L self.M = [] for k_SRC in range(src.N_SRC): xySrc = 82.5*np.array( [[src.zenith[k_SRC]*math.cos(src.azimuth[k_SRC])], [src.zenith[k_SRC]*math.sin(src.azimuth[k_SRC])]] ) _M_ = [] for k in range(6): theta = -k*math.pi/3 #print ROT(theta) xyp = np.dot(ROT(theta),xy - xySrc) - xy_rc _M_.append( np.logical_and( np.abs(xyp[0,:])<self.L/2, np.abs(xyp[1,:])<self.W/2 ) ) for k in range(6): theta = (1-k)*math.pi/3 #print ROT(theta) xyp = np.dot(ROT(theta),xy - xySrc) - xy_rp _M_.append( np.logical_and( np.abs(xyp[0,:])<self.L/2, np.abs(xyp[1,:])<self.W/2 ) ) self.M.append( np.array( _M_ ) ) #print self.M.shape def reset(self): pass
[docs] def piston(self,src): """ Return either M1 segment piston or M1 differential piston Parameters ---------- src : Source The piston sensing guide star object Return ------ p : numpy ndarray A 7 element piston vector for segment="full" or a 12 element differential piston vector for segment="edge" """ if self.segment=="full": p = src.piston(where='segments') elif self.segment=="edge": W = src.wavefront.phase.host() p = np.zeros((src.N_SRC,12)) for k_SRC in range(src.N_SRC): _P_ = src.rays.piston_mask[k_SRC] _M_ = self.M[k_SRC] for k in range(6): #print k,(k+1)%6 p[k_SRC,2*k] = np.sum( W[k_SRC,:]*_P_[k,:]*_M_[k,:] )/np.sum( _P_[k,:]*_M_[k,:] ) - \ np.sum( W[k_SRC,:]*_P_[6,:]*_M_[k,:] )/np.sum( _P_[6,:]*_M_[k,:] ) p[k_SRC,2*k+1] = np.sum( W[k_SRC,:]*_P_[k,:]*_M_[k+6,:] )/np.sum( _P_[k,:]*_M_[k+6,:] ) - \ np.sum( W[k_SRC,:]*_P_[(k+1)%6,:]*_M_[k+6,:] )/np.sum( _P_[(k+1)%6,:]*_M_[k+6,:] ) return p
[docs] def analyze(self, src): """ Computes either M1 segment piston or M1 differential piston (calling the "piston" method), and stores the result in the "measurement" property. """ p = self.piston(src) self.measurement = p.ravel()
[docs] def get_measurement(self): """ Returns the measurement vector """ return self.measurement
[docs] def get_measurement_size(self): """ Returns the size of the measurement vector """ if self.segment=="edge": n_meas = 12 elif self.segment=="full": n_meas = 7 return n_meas*self._N_SRC
class SegmentTipTiltSensor: """ A class for the GMT segment tip-tilt geometric sensor """ def __init__(self): pass def reset(self): pass def tiptilt(self,src): """ Return the tip and tilt of the wavefront on each segment Parameters ---------- src : Source The piston sensing guide star object Return ------ tt : numpy ndarray A 14 element array """ P = np.rollaxis(np.array( src.rays.piston_mask ),0,3) u = np.arange(src.n) v = np.arange(src.m) x,y = np.meshgrid(u,v) x = x.reshape(1,-1,1) y = y.reshape(1,-1,1) xc = np.sum(x*P,axis=1)/P.sum(axis=1) yc = np.sum(y*P,axis=1)/P.sum(axis=1) Z2 = (x - xc.reshape(7,1,src.N_SRC))*P Z3 = (y - yc.reshape(7,1,src.N_SRC))*P W = np.rollaxis( src.wavefront.phase.host(shape=(1,src.N_SRC,src.n*src.m)), 1, 3) a23 = np.zeros((14,src.N_SRC)) a23[:7,:] = np.sum(W*Z2,axis=1)/np.sum(Z2*Z2,axis=1) a23[7:,:] = np.sum(W*Z3,axis=1)/np.sum(Z3*Z3,axis=1) return a23 def analyze(self, gs): self.measurement = self.tiptilt(gs) def get_measurement(self): return self.measurement.ravel() class EdgeSensors: def __init__(self, mirror): self.M = mirror def conic(r): c = mirror.conic_c k = mirror.conic_k return c*r*r/(1+np.sqrt(1-k*(c*r)**2)) def fun(x): L = mirror.D_assembly/2 q = mirror.D_full**2 - (L-x)**2 - (conic(L)-conic(x))**2 return q p = mirror.M_ID - 1 print(p) self.rho0 = brenth(fun,mirror.D_clear/2,1+mirror.D_clear/2) k = np.arange(6) o = math.pi*( p + (-2.0*k-3.0)/6.0 ) self.x0 = (mirror.L-self.rho0)*np.cos(o) self.y0 = (mirror.L-self.rho0)*np.sin(o) R = mirror.D_full/2 o = math.pi*( p + (-2.0*k-1.0)/6.0 ) self.x1 = R*np.cos(o) self.y1 = R*np.sin(o) o = np.pi*( p + (7.0-2.0*k)/6.0 ) self.x2 = np.roll( R*np.cos(o) , -1 ) self.y2 = np.roll( R*np.sin(o) , -1 ) self.z = np.zeros(6) def read(self): u0,v0,w0 = self.M.edge_sensors(self.x0,self.y0,self.z) u1,v1,w1 = self.M.edge_sensors(self.x1,self.y1,self.z,edgeSensorId=6) u2,v2,w2 = self.M.edge_sensors(self.x2,self.y2,self.z,segId0=1,edgeSensorId=6) return np.concatenate((v0,v1-v2)),np.concatenate((w0,w1-w2)) def Trace( rays, S, global_CS=True, wavelength=550e-9): n = len(S) #rays.reset() xyz = [ rays.coordinates.host() ] for k in range(n): #print 'Material refractive index: %f'%rays.refractive_index if isinstance(S[k],Aperture): S[k].vignetting(rays) elif isinstance(S[k],(GMT_M1,GMT_M2)): S[k].trace(rays) xyz.append( rays.coordinates.host() ) elif isinstance(S[k],(GmtMirrors,GMT_MX)): #S[k].M2.blocking(rays) S[k].M1.trace(rays) xyz.append( rays.coordinates.host() ) S[k].M2.trace(rays) xyz.append( rays.coordinates.host() ) else: _S_ = S[k] Transform_to_S(rays,_S_) if not _S_.coord_break: Intersect(rays,_S_) n_S = _S_.refractive_index(wavelength) if n_S!=0: if n_S==-1: Reflect(rays) else: mu = rays.refractive_index/n_S if mu!=1.0: Refract(rays,mu) rays.refractive_index = n_S if global_CS: Transform_to_R(rays,_S_) xyz.append( rays.coordinates.host() ) vignet_idx = np.nonzero(rays.vignetting.host()[0]==0)[0] n = len(xyz) for k in range(n): xyz[k] = np.delete(xyz[k],vignet_idx,axis=0) return xyz