Python遥感开发之Hurst指数的实现
Python遥感开发之Hurst指数的实现
主要讲解Python实现Hurst指数,实现遥感下的Hurst指数,对Hurst指数进行分类,以及结合slope指数实现对未来变化趋势的分析。
文章目录
- Python遥感开发之Hurst指数的实现
- 0 什么是Hurst指数
- 1 Python实现Hurst指数
- 2 Python实现遥感下的Hurst指数
- 3 对Hurst指数进行分级
- 4 结合slope指数对未来变化趋势进行分级
0 什么是Hurst指数
Hurst 指数 H 是定量描述时间序列自相似性与长程依赖性的有效方法。
- H 取值为 0~1, 当 H=0. 5时, 则时间序列为相互独立、方差有限的随机序列;
- 当0. 5<H<1 时, 表明时间序列变化具有持续性, 未来的变化将与过去的变化趋势相一致;
- 当0<H <0. 5时,表明时间序列具有反持续性, 即过去的变化不具有可持续性。
1 Python实现Hurst指数
代码参考来源于《【技术分享】时间序列的Hurst指数计算》
import numpy as npdef Hurst(x):# x为numpy数组n = x.shape[0]t = np.zeros(n - 1) # t为时间序列的差分for i in range(n - 1):t[i] = x[i + 1] - x[i]mt = np.zeros(n - 1) # mt为均值序列,i为索引,i+1表示序列从1开始for i in range(n - 1):mt[i] = np.sum(t[0:i + 1]) / (i + 1)# Step3累积离差和极差,r为极差r = []for i in np.arange(1, n): # i为taocha = []for j in np.arange(1, i + 1):if i == 1:cha.append(t[j - 1] - mt[i - 1])if i > 1:if j == 1:cha.append(t[j - 1] - mt[i - 1])if j > 1:cha.append(cha[j - 2] + t[j - 1] - mt[i - 1])r.append(np.max(cha) - np.min(cha))s = []for i in np.arange(1, n):ss = []for j in np.arange(1, i + 1):ss.append((t[j - 1] - mt[i - 1]) ** 2)s.append(np.sqrt(np.sum(ss) / i))r = np.array(r)s = np.array(s)xdata = np.log(np.arange(2, n))ydata = np.log(r[1:] / s[1:])h, b = np.polyfit(xdata, ydata, 1)return hif __name__ == '__main__':x = np.array([1.59, 1.57, 1.56, 1.54, 1.52, 1.50, 1.47, 1.43, 1.41, 1.40, 1.39])print(Hurst(x))
2 Python实现遥感下的Hurst指数
import numpy as np
import os
from osgeo import gdaldef read_tif(filepath):dataset = gdal.Open(filepath)col = dataset.RasterXSize # 图像长度row = dataset.RasterYSize # 图像宽度geotrans = dataset.GetGeoTransform() # 读取仿射变换proj = dataset.GetProjection() # 读取投影data = dataset.ReadAsArray() # 转为numpy格式data = data.astype(np.float32)no_data_value = data[0][0] # 设定NoData值data[data == no_data_value] = np.nan # 将NoData转换为NaNreturn col, row, geotrans, proj, datadef save_tif(data, reference_file, output_file):ds = gdal.Open(reference_file)shape = data.shapedriver = gdal.GetDriverByName("GTiff")dataset = driver.Create(output_file, shape[1], shape[0], 1, gdal.GDT_Float32) # 保存的数据类型dataset.SetGeoTransform(ds.GetGeoTransform())dataset.SetProjection(ds.GetProjection())dataset.GetRasterBand(1).WriteArray(data)dataset.FlushCache()def get_tif_list(directory):"""获取目录下所有TIF文件的完整路径"""return [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.tif')]def Hurst(x):# x为numpy数组n = x.shape[0]t = np.zeros(n - 1) # t为时间序列的差分for i in range(n - 1):t[i] = x[i + 1] - x[i]mt = np.zeros(n - 1) # mt为均值序列,i为索引,i+1表示序列从1开始for i in range(n - 1):mt[i] = np.sum(t[0:i + 1]) / (i + 1)# Step3累积离差和极差,r为极差r = []for i in np.arange(1, n): # i为taocha = []for j in np.arange(1, i + 1):if i == 1:cha.append(t[j - 1] - mt[i - 1])if i > 1:if j == 1:cha.append(t[j - 1] - mt[i - 1])if j > 1:cha.append(cha[j - 2] + t[j - 1] - mt[i - 1])r.append(np.max(cha) - np.min(cha))s = []for i in np.arange(1, n):ss = []for j in np.arange(1, i + 1):ss.append((t[j - 1] - mt[i - 1]) ** 2)s.append(np.sqrt(np.sum(ss) / i))r = np.array(r)s = np.array(s)xdata = np.log(np.arange(2, n))ydata = np.log(r[1:] / s[1:])h, b = np.polyfit(xdata, ydata, 1)return hif __name__ == '__main__':# 输入文件夹路径input_directory = r"E:\AAWORK\evi"output_directory = r"E:\AAWORK\hurst"# 读取文件列表file_list = get_tif_list(input_directory)# 读取第一个文件获取基本信息(列、行、仿射变换、投影)col, row, geotrans, proj, _ = read_tif(file_list[0])# 预分配结果数组hurst_data = np.zeros((row, col), dtype=np.float32)# 将所有 TIFF 文件的数据堆叠为一个三维数组 (时间, 行, 列)tif_stack = np.array([read_tif(f)[4] for f in file_list])# 遍历每个像素for i in range(row):print(f"Processing row {i+1}/{row}...")for j in range(col):pixel_values = tif_stack[:, i, j] # 取出所有时间点的该像素值if not np.any(np.isnan(pixel_values)): # 过滤掉包含NaN的像素h = Hurst(pixel_values)hurst_data[i, j] = h# 输出文件路径save_tif(hurst_data, file_list[0], os.path.join(output_directory, 'hurst.tif'))print("Processing completed!")
3 对Hurst指数进行分级
对Hurst指数分类成8个级别,级别分类参考某硕士论文内容
import numpy as np
from osgeo import gdaldef read_tif(filepath):dataset = gdal.Open(filepath)col = dataset.RasterXSize#图像长度row = dataset.RasterYSize#图像宽度geotrans = dataset.GetGeoTransform()#读取仿射变换proj = dataset.GetProjection()#读取投影data = dataset.ReadAsArray()#转为numpy格式data = data.astype(np.float32)a = data[0][0]data[data == a] = np.nanreturn [col, row, geotrans, proj, data]def save_tif(data, file, output):ds = gdal.Open(file)shape = data.shapedriver = gdal.GetDriverByName("GTiff")dataset = driver.Create(output, shape[1], shape[0], 1, gdal.GDT_Int16)#保存的数据类型dataset.SetGeoTransform(ds.GetGeoTransform())dataset.SetProjection(ds.GetProjection())dataset.GetRasterBand(1).WriteArray(data)if __name__ == '__main__':# 调用hurst_path = r"E:\AAWORK\hurst.tif"output_path = r"E:\AAWORK\hurst_等级.tif"col, row, geotrans, proj, hurst_data = read_tif(hurst_path)classified = np.zeros_like(hurst_data, dtype=np.int16)classified[hurst_data < 0.25] = 1classified[(hurst_data >= 0.25) & (hurst_data < 0.35)] = 2classified[(hurst_data >= 0.35) & (hurst_data < 0.45)] = 3classified[(hurst_data >= 0.45) & (hurst_data < 0.50)] = 4classified[(hurst_data >= 0.50) & (hurst_data < 0.55)] = 5classified[(hurst_data >= 0.55) & (hurst_data < 0.65)] = 6classified[(hurst_data >= 0.65) & (hurst_data < 0.75)] = 7classified[hurst_data >= 0.75] = 8save_tif(classified,hurst_path,output_path)
4 结合slope指数对未来变化趋势进行分级
结合slope指数,生成未来趋势结果,参考某硕士论文,slope指数不了解的请参考我的另外一个博客《Python遥感开发之数据趋势分析Sen+mk_sen+mk趋势分析-CSDN博客》
import numpy as np
import os
from osgeo import gdal, gdalnumericdef read_tif(filepath):dataset = gdal.Open(filepath)col = dataset.RasterXSize # 图像长度row = dataset.RasterYSize # 图像宽度geotrans = dataset.GetGeoTransform() # 读取仿射变换proj = dataset.GetProjection() # 读取投影data = dataset.ReadAsArray() # 转为numpy格式data = data.astype(np.float32)no_data_value = data[0][0] # 设定NoData值data[data == no_data_value] = np.nan # 将NoData转换为NaNreturn col, row, geotrans, proj, datadef save_tif(data, reference_file, output_file):ds = gdal.Open(reference_file)shape = data.shapedriver = gdal.GetDriverByName("GTiff")dataset = driver.Create(output_file, shape[1], shape[0], 1, gdal.GDT_Int32) # 保存的数据类型dataset.SetGeoTransform(ds.GetGeoTransform())dataset.SetProjection(ds.GetProjection())dataset.GetRasterBand(1).WriteArray(data)dataset.FlushCache()def read_tif02(file):data = gdalnumeric.LoadFile(file)data = data.astype(np.float32)a = data[0][0]data[data == a] = np.nanreturn datadef get_tif_list(directory):"""获取目录下所有TIF文件的完整路径"""return [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.tif')]if __name__ == '__main__':# 输入文件夹路径slope_file = r"E:\AAWORK\slope.tif"hurst_file = r"E:\AAWORK\hurst.tif"out_file = r"E:\AAWORK"col, row, geotrans, proj, hurst_data = read_tif(hurst_file)slope_data= read_tif02(slope_file)classified = np.zeros_like(hurst_data, dtype=np.int16)classified[(hurst_data < 0.5) & (slope_data < 0)] = 1classified[(hurst_data > 0.5) & (slope_data < 0)] = 2classified[(hurst_data < 0.5) & (slope_data > 0)] = 3classified[(hurst_data > 0.5) & (slope_data > 0)] = 4save_tif(classified, slope_file, os.path.join(out_file, 'hurst&slope.tif'))print("Processing completed!")