Source code for raidnr.util.SpatialToolbox

#core packages 
import pandas as pd
import numpy as np, itertools, os, fnmatch
from math import sin,cos,tan,atan,sqrt
from scipy.spatial import ConvexHull
import shutil
import re
#packages for geospatial analysis 
#shapely
import shapely
from shapely.ops import unary_union, cascaded_union,polygonize, unary_union,nearest_points
from shapely.geometry import Point, LineString,Polygon,shape, mapping,MultiPoint,MultiPolygon,MultiLineString
from shapely import geometry
#proj
from pyproj import Proj, transform
#geopandas
import geopandas as gp
from geopandas import GeoDataFrame
#fiona
import fiona
#ogr/gdal/osr
import ogr, gdal, osr
import glob

#bokeh packages
import bokeh
from bokeh.io import export_png
# from bokeh.tile_providers import get_provider, Vendors
from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource,LogColorMapper, LogTicker, ColorBar
from bokeh.models.annotations import Title
from bokeh.palettes import Spectral6,Inferno10
from bokeh.transform import linear_cmap
import matplotlib as mpl
import subprocess

from osgeo import gdal_array
from gdal import FillNodata
from gdalconst import *
from scipy import misc
import warnings



from multiprocessing import current_process
try:
    current_process()._config
except AttributeError:
    current_process()._config = {'semprefix': '/mp'}


#KD-tree
import sklearn
from sklearn.neighbors import KDTree

from raidnr.config import CRS

import sys
cwd=os.getcwd()
src=src=os.path.dirname(cwd)
#append the path to the PYTHONPATH
sys.path.insert(0, src)

#import RAID modules

#insert the modules
import raidnr
from raidnr import *
from raidnr.config import CRS, GRANULARITY, METRICS

import raidnr.bwgrid.grid_transformations as gt

from raidnr.util.ext_methods import *
import raidnr.util.vertical_alignment as va


#======================================================================-VARIABLES========================================================

#======================================================================-FUNCTIONS========================================================


# 1.0 Single Sided buffers 

#1.0 Single Sided Buffer 
[docs]def SingleSidedBuffer(line,dist,side): """ This function creates single sided buffer for left or down side (depending on the orientation of the line) of a LineString. Result is a shapely.Polygon Parameters ---------- line: a shapely.LineString object dist: the buffer distance | based on the units of the line's CRS side: a string defining the side towrds which the buffer will be created | values: 'left'/'right'/'both' """ if side=='both': left=SingleSidedBuffer(line,dist,'left') right=SingleSidedBuffer(line,dist,'right') buffers = [left,right] buffer_union = cascaded_union(buffers) return buffer_union else: #the one-side buffer parallel=line.parallel_offset(distance=dist, side=side) #handling complex shapes that might derive from the buffer try: if len(parallel) > 1: p_line=parallel[0] for i in range (len(parallel)): if len(parallel[i].coords[:]) > len(p_line.coords[:]): p_line=parallel[i] except: p_line=parallel #coords need to be reversed if side =='left': p_line=LineString(list(reversed(p_line.coords[:]))) a=LineString([line.coords[0],p_line.coords[len(p_line.coords)-1]]) #line-connector b=LineString([line.coords[len(line.coords)-1],p_line.coords[0]]) #line - connector cd = itertools.chain(b.coords[:] ,list(p_line.coords),a.coords[:],line.coords[:]) #create a sequence of points #create the polygon single_buffer=Polygon(list(cd)) return single_buffer
#2.0 Convert a geodataframe that contains multiple lines into one single line ***** RE-ASSESS THE FUNCTION
[docs]def OneLine(line_gdf): """ The function takes as an input a geopandas geo-dataframe with multiple LineStrings that make a network to a unified LineString. The output of the function is a geo-dataframe Keyword arguments: line_gdf - geopandas geo-dataframe of multiple LineStrings """ points=[] for i in range(len(line_gdf)): line=line_gdf.geometry[i] for j in range(len(line.coords[:])): points.append(Point(line.coords[:][j])) #the points are not ordered - try to order them using the X coords - ascending pts_geo=GeoDataFrame(points,columns={'geometry':[]}) #order based on the X coords - from smallest to largest X coord order=pts_geo.geometry.x pts_geo['order']=order new=pts_geo.sort_values(by='order',ascending=True).reset_index(drop=True) #create an id for the points new['id']=list(range(0, len(new))) LineString(new.geometry) #drop duplicates x=new.geometry.x y=new.geometry.y new['x']=x new['y']=y dropped=new.drop_duplicates(subset=('x','y'),keep='first') list_pts=[] for point in new.geometry: list_pts.append(point) #loop: checks all nearest points, starting from the first point (minimum X-coord) #each time the nearest point becomes the origin point and the origin point is removed #from the list so that vthe line is constructed from start with direction to the destination point ordered=[] a=len(list_pts) orig=0 while a>1: if a==len(new): orig=Point(new.geometry[0].coords[:][0]) removed=orig list_pts.remove(removed) NN=nearest_points(orig,MultiPoint(list_pts)) p=Point(NN[1].coords[:][0]) ordered.append(p) orig=p a=a-1 else: removed=orig list_pts.remove(removed) NN=nearest_points(orig,MultiPoint(list_pts)) p=Point(NN[1].coords[:][0]) ordered.append(p) orig=p a=a-1 oneLine=LineString(ordered) oneLine=GeoDataFrame(geometry=gp.GeoSeries(LineString(ordered))) oneLine.crs=CRS return oneLine
[docs]def snap_points(pointLayer,Line): """ This function takes as input a geodatframe of points and snaps the points to a LineString geodataframe. NOTE: The Line GeoDataFrame should be one unique line, meaning that len(Line)=0 The function returns the original point geodataframe but with the geometry column being replaced by the snapped point coordinates. pointLayer-- geopandas GeoDataFrame of 'Point' geometry Line-- geopandas GeoDataFrame of 'LineString' geometry """ snapped=[] for pt in pointLayer.geometry: ppt = Line.interpolate(Line.project(pt)) #line object snapped.append(ppt) pointLayer.drop('geometry',axis=1) pointLayer['geometry']=snapped return pointLayer
[docs]def clip_polyPoints(pointLayer,clipLayer): """ Return a geopandas.GeoDataFrame with only the points that fall within a specified Polygon Parameters ---------- pointLayer: geopandas.GeoDataFrame of Point features clipLayer: shapely.Polygon """ inter=pointLayer.intersects(clipLayer) #use 'intersects' instead of 'within' points_within=pointLayer.loc[inter].reset_index(drop=1) return points_within
[docs]def clip_polyLines(bboxFile,filePath,outputFile): """ Function for clipping lines within a polygon object from: https://www.earthdatascience.org/courses/earth-analytics-python/spatial-data-vector-shapefiles/clip-vector-data-in-python-geopandas-shapely/ Keyword arguments: lineLayer--geopandas geodataframe of Line geometry clipLayer--geopandas geodataframe of Polygon geometry """ #create a single clipping polygon clipLayer=gp.read_file(bboxFile) lineLayer=gp.read_file(filePath) if type(clipLayer) == gp.GeoDataFrame: #if using a geodataframe with multiple poly=clipLayer.geometry.unary_union elif type(clipLayer) == Polygon: poly = clipLayer #create a spatial index of the input data (to be clipped) sp_index=lineLayer.sindex #the bounding box of the clipping polygon bbox=poly.bounds #filter the input data to the data that overlaps with the boundaries of the polygon sidx=list(sp_index.intersection(bbox)) subset=lineLayer.iloc[sidx] #clip the subset with the actual polygon feature clipped=subset.copy() clipped['geometry']=subset.intersection(poly) #keep the features that have not null geometries clipped=clipped[clipped.geometry.notnull()] clipped.crs=CRS clipped.to_file(outputFile)
[docs]def clip_polyPolygons(gdf,clipLayer): """ """ simGDF=shapeID(gdf) simGDF=fromMultiToSimpleGeometry(simGDF) polyCoords=[(simGDF['FeatureID'][idx],MultiPoint(poly.exterior.coords[:] + poly.centroid.coords[:])) for idx,poly in enumerate(simGDF.geometry)] #create a df from the coords df_=pd.DataFrame(list(list(zip(*polyCoords))[0]),columns={'FeatureID':[]}) #create a geo df polyPoints=gp.GeoDataFrame(df_,geometry=gp.GeoSeries(list(zip(*polyCoords))[1])) #clip point in ids=clip_polyPoints(polyPoints,clipLayer)['FeatureID'].unique() #keep only the polygons that their ids are within the clipping area clipped=simGDF.loc[simGDF['FeatureID'].apply(lambda x: x in ids)] return clipped
# def clip_spatialFeatures(gdf,bbox): # """ # Parameters # ---------- # gdf: geopandas geodataframe # gdf # bbox: shapely polygon # bounding box # Returns # ------- # geopandas geodataframe # """ # if any(gdf.geom_type=='Polygon'): # clipped=clip_polyPolygons(gdf,bbox) # elif any(gdf.geom_type=='Point'): # clipped=clip_polyPoints(gdf,bbox) # elif any(gdf.geom_type=='LineString'): # clipped=clip_polyLines(gdf,bbox) # return clipped
[docs]def shapeID(gdf): """ Creates a geo dataframe that contains only the geometry and a custom ID Parameters ---------- gdf: geopandas geodataframe gdf Returns ------- geopandas geodataframe """ gdf=gdf[gdf.geometry.notnull()].reset_index(drop=1) #drop null geometries gdf['FeatureID']=range(0,len(gdf)) #reshape the geodataframe gdf=gdf[['FeatureID','geometry']] return gdf
[docs]def fromMultiToSimpleGeometry(gdf): """ """ if len(gdf)>0: from shapely.geometry import MultiLineString,MultiPoint,MultiPolygon newGdf=pd.DataFrame(columns={col for col in list(gdf.columns)}) index=0 for idx,shape in enumerate (gdf.geometry): if shape.geom_type in ['MultiLineString','MultiPoint','MultiPolygon']: for shape_geom in shape: newGdf.loc[index,:]=gdf.loc[idx,:].copy() newGdf['geometry'][index]=shape_geom index+=1 else: newGdf.loc[index,:]=gdf.loc[idx,:].copy() index+=1 newGdf=gp.GeoDataFrame(newGdf,geometry=newGdf['geometry']) newGdf.crs=CRS return newGdf else: return gdf
# Cut lines #get a specific segment
[docs]def cut(line, distance): """ Cuts a line in two at a distance from its starting point This is taken from shapely manual """ if distance <= 0.0 or distance >= line.length: return [LineString(line)] coords = list(line.coords) for i, p in enumerate(coords): pd = line.project(Point(p)) if pd == distance: return [ LineString(coords[:i+1]), LineString(coords[i:])] if pd > distance: cp = line.interpolate(distance) return [ LineString(coords[:i] + [(cp.x, cp.y)]), LineString([(cp.x, cp.y)] + coords[i:])]
[docs]def cut_line_at_points(line, points): """ cut line at multiple points Parameters ---------- line : shapely LineString line to cut points : shapely points points to cut the line with Returns ------- shapely LineString cut lines """ #line: shapely linestring #points: list of shapely points # First coords of line coords = list(line.coords) # Keep list coords where to cut (cuts = 1) cuts = [0] * len(coords) cuts[0] = 1 cuts[-1] = 1 # Add the coords from the points coords += [list(p.coords)[0] for p in points] cuts += [1] * len(points) # Calculate the distance along the line for each point dists = [line.project(Point(p)) for p in coords] # sort the coords/cuts based on the distances # see http://stackoverflow.com/questions/6618515/sorting-list-based-on-values-from-another-list coords = [p for (d, p) in sorted(zip(dists, coords))] cuts = [p for (d, p) in sorted(zip(dists, cuts))] # generate the Lines #lines = [LineString([coords[i], coords[i+1]]) for i in range(len(coords)-1)] lines = [] for i in range(len(coords)-1): if cuts[i] == 1: # find next element in cuts == 1 starting from index i + 1 j = cuts.index(1, i + 1) lines.append(LineString(coords[i:j+1])) return lines
[docs]def split_line_with_points(line, points): """Splits a line string in several segments considering a list of points. The points used to cut the line are assumed to be in the line string and given in the order of appearance they have in the line string. Parameters ---------- line : shapely LineString line points : shapely points points Returns ------- shapely LineString >>> line = LineString( [(1,2), (8,7), (4,5), (2,4), (4,7), (8,5), (9,18), ... (1,2),(12,7),(4,5),(6,5),(4,9)] ) >>> points = [Point(2,4), Point(9,18), Point(6,5)] >>> [str(s) for s in split_line_with_points(line, points)] ['LINESTRING (1 2, 8 7, 4 5, 2 4)', 'LINESTRING (2 4, 4 7, 8 5, 9 18)', 'LINESTRING (9 18, 1 2, 12 7, 4 5, 6 5)', 'LINESTRING (6 5, 4 9)'] """ segments = [] current_line = line for p in points: d = current_line.project(p) seg, current_line = cut(current_line, d) segments.append(seg) segments.append(current_line) return segments
#Projection Transformations
[docs]def wgs_to_bng(center): """ Project to BNG Parameters ---------- center: tuple Returns ------- tuple """ inProj = Proj(init='epsg:4326') outProj = Proj(init='epsg:27700') x1,y1 = center x2,y2 = transform(inProj,outProj,x1,y1) return(x2,y2)
[docs]def bng_to_wgs(center): """ Transfrom coordinates from BNG to WGS Parameters ---------- center: tuple Returns ------- tuple """ inProj = Proj(init='epsg:27700') outProj = Proj(init='epsg:4326') x1,y1 = center x2,y2 = transform(inProj,outProj,x1,y1) return(x2,y2)
[docs]def bng_to_3857(center): """ Transfrom coordinates from BNG to WGS Parameters ---------- center: tuple Returns ------- tuple """ inProj = Proj(init='epsg:27700') outProj = Proj(init='epsg:3857') x1,y1 = center x2,y2 = transform(inProj,outProj,x1,y1) return(x2,y2)
[docs]def select_segment(orig_pt,dest_pt,line): """ Parameters ---------- orig_pt: shapely Point start of the segment | should be part of the segment dest_pt: shapely Point end of the segment | should be part of the segment line: shapely LineString subsegment Returns ------- shapely LineString """ cutPoints=[orig_pt,dest_pt] line_segment=cut_line_at_points(line,cutPoints) coords=[line.coords[:] for line in line_segment] #the 'line_segmnet' is a list of lines- to find the line that is between the origin & destination points we need to do the following: for i in range(len(coords)): if (orig_pt.coords[:][0] in coords[i]) & (dest_pt.coords[:][0] in coords[i]): segment=LineString(coords[i]) return segment
# def preProcessContext(contextPath,bufferCSV,bbox,savePath): # """ # contextPath: the folder path where all shapefiles are stored # bufferCSV: the csv with the buffer distances # bbox: the clipping area for the context data # savePath: the folder where all the data should be saved # """ # allShp=glob.glob(contextPath + "/*.shp") #load all shapefiles # for file_ in allShp: # geodata=st.shapeID(gpd.read_file(file_)) # geodata=st.fromMultiToSimpleGeometry(geodata) # clipped=st.clip_spatialFeatures(geodata,bbox) # if len(clipped)>0: # LayerName=str(os.path.splitext(os.path.split(file_)[1])[0]) # clipped['Layer']=LayerName # dist=bf.loc[bf['Asset']==LayerName,'Dist'].values[0] #buffer distance # clipped['geometry']=clipped.buffer(dist) #calculate the buffers # print(LayerName) # dissolved=clipped.dissolve(by='Layer') #dissolve the layer # geoFile=gpd.GeoDataFrame(dissolved,geometry=gpd.GeoSeries(dissolved['geometry'])) # geoFile.crs=CRS # geoFile.to_file(os.path.join(savePath,'Processed_' + LayerName + '.shp')) # else: # pass
[docs]def circuity_area(shape, origin, destination, gmax, fill_value=1.): """ Calculate circuity Parameters ---------- shape : tuple (i,j) shape of the raster gmax : float maximum circuity origin : tuple origin point destination : tuple destination point fill_value : float [optional] default=0. value to assign to points outside of allowed circuity """ # circuity circuity=np.zeros(shape) evaluation=np.zeros(shape) #compute the circuity for i in range(shape[0]): for j in range(shape[1]): calculatedValue=va.airline_dist(origin[0], origin[1], destination[0],destination[1],i,j)[0] circuity[i,j]=calculatedValue calculatedValue=va.evaluate_circuity(calculatedValue, gmax) evaluation[i,j]=calculatedValue # masked path for visualization x = np.ma.array(evaluation,mask=evaluation,fill_value=fill_value) reachable = x.filled() return reachable
[docs]def clipOGR(bboxFile,filePath,outputFile): """ Clipping function using ogr Parameters ---------- bboxFile : str Path to the bbox.shp file filePath : str Path for the shp to be clipped outputFile : str Path to the clipped .shp Returns ------- None """ import subprocess callstr = ['ogr2ogr', '-clipsrc', bboxFile, outputFile, filePath] proc = subprocess.Popen(callstr, stdout=subprocess.PIPE,stderr=subprocess.PIPE) stdout,stderr=proc.communicate()
# print(stdout,stderr)
[docs]def clipShp(contextPath, clippedPath, bboxFile): """ Takes as input a series of shapefiles in a folder clips them to a bbox. Returns a series of clipped shapefiles. Parameters ---------- contextPath: str the path of the folder where shapefiles are stored clippedPath: str the path of the folder where the result shapefiles will be stored bboxFile: str the path of the bbox.shp Returns ------- None """ allShp=glob.glob(contextPath + "/*.shp") #load all shapefiles for file_ in allShp: LayerName=str(os.path.splitext(os.path.split(file_)[1])[0]) filePath=file_ outputFile=os.path.join(clippedPath, LayerName + '.shp') clipOGR(bboxFile,filePath,outputFile)
[docs]def bufferShp(bf,clippedPath,outputPath): """ Takes as input a series of shapefiles and applies a buffer zone based on a csv file. The geometries are dissolved and saved in an output folder location. Parameters ---------- bf: pd.DataFrame buffer distances clippedPath: str folder path where all shapefiles are stored outPath: str the folder where result layers will be stored Returns ------- None """ allShp=glob.glob(clippedPath + "/*.shp") for file_ in allShp: try: inputFile=gpd.read_file(file_) inputFile.crs=CRS LayerName=str(os.path.splitext(os.path.split(file_)[1])[0]) inputFile['Layer']=LayerName #buffer dist=bf.loc[bf['Asset']==LayerName,'Dist'].values[0] #buffer distance inputFile['geometry']=inputFile.buffer(dist) #calculate the buffers #dissolve dissolved=inputFile.dissolve(by='Layer') #dissolve the layer #save geoFile=gpd.GeoDataFrame(dissolved,geometry=gpd.GeoSeries(dissolved['geometry'])) geoFile.crs=CRS geoFile.to_file(os.path.join(outputPath,'Processed_' + LayerName + '.shp')) except: pass
[docs]def clipGPD(inputFile,bbox): """ Workaround clipping problems using geopandas. https://gis.stackexchange.com/questions/280535/known-intersecting-polygons-returning-false-for-intersects-in-geopandas Parameters ---------- inputFile : geodataFrame File to be clipped bbox : geodataFrame Bounding box to clip the inputFile Returns ------- clipped_file : geodataFrame Clipped file """ sj = gpd.sjoin(inputFile, bbox, how='inner', op='intersects') temp_file = inputFile.geom_almost_equals(sj) temp_file = temp_file[~temp_file.index.duplicated()] clipped_file = inputFile[temp_file] return clipped_file
[docs]def fromCoordsToBbox(minx,miny,maxx,maxy,savePath,dist=50): """ """ p1=Point(minx,miny) p2=Point(maxx,miny) p3=Point(maxx,maxy) p4=Point(minx,maxy) pointList=[p1,p2,p3,p4] poly = geometry.Polygon([[p.x, p.y] for p in pointList]) if dist > 0: poly=poly.buffer(dist) poly_gdf=gpd.GeoDataFrame(geometry=gpd.GeoSeries(poly)) poly_gdf.crs=CRS # poly_gdf['geometry'] = poly_gdf['geometry'].to_crs(epsg=27700) poly_gdf.to_file(os.path.join(savePath,'bbox.shp')) return poly
[docs]def Assets(PATH_OS,a,FNAME_BUFFER): """ PATH_OS: Path to the folder where all input layers are stored a: shapely.LineString of the alignment FNAME_BUFFER: the path to the buffer csv """ bf=pd.read_csv(FNAME_BUFFER) #read the buffer assetShp=glob.glob(PATH_OS + "/*.shp") #the shp #list of the assets that will be placed on the alignment assetNames=['RailwayStation','LevelCrossings','RailwayTunnel','Bridges'] asset_dict={} alignment_points=gpd.GeoDataFrame(geometry=gpd.GeoSeries([Point(x) for x in a.coords[:]])).reset_index() alignment_points.crs=CRS for file_ in assetShp: fileName = str(os.path.splitext(os.path.split(file_)[1])[0]) if fileName in assetNames: try: asset=gpd.read_file(file_) #read the file as gdf asset=asset[asset.geometry.notnull()].reset_index(drop=1) #remove any NUll geometry if any(shape.geom_type in ['MultiLineString','MultiPoint','MultiPolygon'] for shape in asset.geometry): asset=fromMultiToSimpleGeometry(asset) #simplify geometries asset.crs=CRS #assign correct projection #Clip assets within the designated boundaries of the alignment dist=bf.loc[bf['Asset']==fileName]['Dist'].values[0] #buffer distance based on the CSV SingleSideBuffer=gpd.GeoDataFrame(geometry=gpd.GeoSeries(a.buffer(dist))) #single sided buffer SingleSideBuffer.crs=CRS clipped_asset=clipGPD(asset,SingleSideBuffer).reset_index(drop=1) if len(clipped_asset)>0: #only if there are assets to snap if any(clipped_asset.geom_type == 'LineString'): snapped_asset=[] for idx,line in enumerate(clipped_asset.geometry): sample=gpd.GeoDataFrame(geometry=gpd.GeoSeries([Point(x) for x in line.coords[:]])) # snapped_pts=snap_points(sample,a) #snap the points of the line to the rail clipped_asset['geometry'][idx]=LineString(list(snapped_pts.geometry)) #re-create the line from the snapped points snapped_asset=clipped_asset.copy() else: #Snap the assets to the alignment snapped_asset=snap_points(clipped_asset,a) #Buffer for the snapped assets (real structures) buffer_asset=snapped_asset.buffer(dist) bufferGDF=gpd.GeoDataFrame(geometry=gpd.GeoSeries(buffer_asset)) bufferGDF.crs=CRS #add the index of the points in the alignment that fall with the asset's buffers name=str(os.path.splitext(os.path.split(file_)[1])[0]) value=clipGPD(alignment_points,bufferGDF).reset_index(drop=1)['index'].values asset_dict[name]=value else: pass except: pass return asset_dict
[docs]def plot_metrics_basemap(self,plotTitle,metric,fn): """ plotTitle: string - title of the plot metric: np.array """ # from numpy import inf # rad[rad == inf] = 100000 metric=np.nan_to_num(metric) #to assure that there is no NaN values pts= self.line.coords[:] #coordinates to EPSG:3857 x_range=list(zip(*[bng_to_3857(x) for x in pts]))[0] y_range=list(zip(*[bng_to_3857(x) for x in pts]))[1] # tile_provider = get_provider(Vendors.STAMEN_TERRAIN) tile_provider = get_provider(Vendors.CARTODBPOSITRON) #define the plot window p = figure(x_range=(min(x_range),max(x_range)), y_range=(min(y_range),max(y_range)), plot_height=800, plot_width=1200, x_axis_type="mercator", y_axis_type="mercator", toolbar_location=None) #remove both axis p.axis.visible = False #add title t = Title() t.text_font_size='14pt' t.offset=1.0 t.text = 'METRIC: ' + plotTitle p.title = t #add the basemap from tile provider p.add_tile(tile_provider) #add the data points source=ColumnDataSource(data=dict(y=list(y_range),x=list(x_range),value=list(metric))) #color scheme mapper = linear_cmap(field_name='value', palette=Inferno10 ,low=min(metric) ,high=max(metric)) #plot data on the basemap p.circle(x='x', y='y',line_color=mapper,color=mapper, fill_alpha=0.6, size=2, source=source) #add the colorbar color_bar = ColorBar(color_mapper=mapper['transform'], height=8, location=(0,0),orientation="horizontal") p.add_layout(color_bar, 'below') #save map to directory return export_png(p, filename=os.path.join(fn,str(plotTitle) + '.png'))
[docs]def speed_pts_to_segs(speed_pt, alignment, track_id, bf_dist=20.): """ Parameters ---------- speed_pt : str or shapely Point/MultiPoint series shapefile filename or shapely geometry of speed points alignment : ALignment or LineString alignment track_id : int track id """ if not isinstance(alignment, LineString): alignment = alignment._line assert(isinstance(alignment,LineString)), "alignment must be a LiseString object, given {}".format(type(alignment)) if isinstance(speed_pt,str): speed_pt = gp.read_file(speed_pt) else: pass #Convert to Point geometry if speed_pt.geometry[0].geom_type=='MultiPoint': speed_pt = fromMultiToSimpleGeometry(speed_pt) else: pass #clip the speed point within a buffer clip_bf = gp.GeoDataFrame(geometry=gp.GeoSeries(alignment.buffer(bf_dist))) clip_bf.crs = CRS #set the correct CRS clipped_speed = clipGPD(speed_pt, clip_bf) # clipped_speed = clip_polyPoints(speed_pt, clip_bf.geometry[0]) #the points # filter on track id # eg. here we take track 2100 # this is the Down Fast track - (for the left track) t = type(clipped_speed) clipped_speed.to_file(r'C:\Temp\speed_test.shp') flt_speed=clipped_speed.loc[clipped_speed['Track_ID']==track_id].reset_index(drop=1) # outProj = Proj("+init=EPSG:27700") inProj = Proj("+init=EPSG:4326") # WGS84 in degrees and not EPSG:3857 in meters #WGS84 => BNG - start & end points startPoints=[Point(transform(inProj,outProj,flt_speed['Start_Long'][i],flt_speed['Start_Lati'][i])) for i in range(len(flt_speed))] endPoints=[Point(transform(inProj,outProj,flt_speed['End_Longit'][i],flt_speed['End_Latitu'][i])) for i in range(len(flt_speed))] #make a copy of the speed geo-dataframe and drop the geometry column BNGSpeed=flt_speed.copy().drop('geometry',axis=1) #first we snap the points to the line snappedStartPoints=snap_points(gp.GeoDataFrame(geometry=gp.GeoSeries(startPoints)),alignment) snappedEndPoints=snap_points(gp.GeoDataFrame(geometry=gp.GeoSeries(endPoints)),alignment) #add the geometry columns in the dataset BNGSpeed['geometry_s']=snappedStartPoints.geometry BNGSpeed['geometry_e']=snappedEndPoints.geometry #apply a unique id to both original and modified dfs - in case we want to join information between the two BNGSpeed['UniqueID']=[BNGSpeed['ELR'][i] + str(BNGSpeed['Start_Mile'][i]) for i in range(len(BNGSpeed))] flt_speed['UniqueID']=[flt_speed['ELR'][i] + str(flt_speed['Start_Mile'][i]) for i in range(len(flt_speed))] #order the speed points based on their distance from the start of the line BNGSpeed['Dist']=[alignment.project(Point(p)) for p in BNGSpeed['geometry_s']] BNGSpeed=BNGSpeed.sort_values(by='Dist',ascending=True).reset_index(drop=1) speedLines=[] for i in range(len(BNGSpeed)-1): #iterating through the points p1=BNGSpeed['geometry_s'][i] p2=BNGSpeed['geometry_e'][i] cutPoints = [p1,p2] split = split_line_with_points(alignment,cutPoints) speedLines.append((split[1],BNGSpeed['Permissibl'][i])) splitSpeed=pd.DataFrame(speedLines,columns={'geometry':[],'Speed':[]}) splitSpeed=gp.GeoDataFrame(splitSpeed,geometry=splitSpeed['geometry']) splitSpeed.crs=CRS return splitSpeed['geometry'].values
[docs]def get_clipping_bounds(bbox,raster): """ Parameters ---------- raster: str raster path | C:\\raster\raster.tif bbox : str bbox shp path | C:\\bbox\bbox.shp Returns ------- coordinates of the bounding box """ #create a dictionary with min max coords bounds={'minx':[],'miny':[],'maxx':[],'maxy':[]} #get the bounds of the shapefile - clipping mask minx,miny,maxx,maxy=gpd.read_file(bbox).bounds.values[0] bounds['minx'].append(minx) bounds['miny'].append(miny) bounds['maxx'].append(maxx) bounds['maxy'].append(maxy) #get the bounds of the raster r = gdal.Open(raster) band = r.GetRasterBand(1) (upper_left_x, x_size, x_rotation, upper_left_y, y_rotation, y_size) = r.GetGeoTransform() RasterXSize,RasterYSize=r.RasterXSize,r.RasterYSize #calculate bounds of the raster bounds['minx'].append(upper_left_x) bounds['maxy'].append(upper_left_y) bounds['miny'].append(upper_left_y + y_size*RasterYSize) bounds['maxx'].append(upper_left_x + x_size*RasterXSize) #compare the bounds and choose the exact clipping mask minx=max(bounds['minx']) miny=max(bounds['miny']) maxx=min(bounds['maxx']) maxy=min(bounds['maxy']) return minx,miny,maxx,maxy
#Translate rasters: from .asc to .tif
[docs]def translate_raster(inputfile,outputfile): """ Parameters: inputfile: str path and file name of the .asc file to convert outputfile: str path and file for the created .tif file Returns: .tif file """ translate_command = ["gdal_translate", "-of", "EHdr",inputfile, outputfile] subprocess.call(translate_command, shell=True)
[docs]def clip_raster(bbox,raster,raster_out): """ Parameters: bbox: str path to the bbox shapefile raster: str path to the raster file to be clipped raster_out: str path to the clipped raster Returns: .tif file clipped to the given bounding box """ minx,miny,maxx,maxy=get_clipping_bounds(bbox,raster) clip = ["gdalwarp","-tr","3","3","-te",str(minx),str(miny),str(maxx),str(maxy),"-te_srs" ,"EPSG:27700","-nosrcalpha","-dstnodata","-9999","-r","near", "-wo", "SOURCE_EXTENT=1000","-crop_to_cutline",raster,raster_out] # subprocess.call(clip,shell=True) # print(' '.join(clip)) subprocess.run(clip,shell=True)
[docs]def fill_gaps(elevation, max_distance=100): """Fills gaps in SRTM elevation data for which the distance from missing pixel to nearest existing one is smaller than `max_distance`. Parmeters --------- elevation : numpy.ndarray SRTM elevation data (in meters) max_distance: int maximal distance (in pixels) between a missing point Attribution ----------- "https://programtalk.com/python-examples/osgeo.gdal.FillNodata/" Return ------ elevation: numpy.ndarray SRTM elevation data with filled gaps """ warnings.warn("The fill_gaps function has been deprecated. " "See the \"What's new\" section for v0.14.") src_ds = gdal_array.OpenArray(elevation) srcband = src_ds.GetRasterBand(1) dstband = srcband maskband = srcband smoothing_iterations = 0 options = [] gdal.FillNodata(dstband, maskband, max_distance, smoothing_iterations, options, callback=None) elevation = dstband.ReadAsArray() return elevation