hbase 工具类
pom.xml
< dependency> < groupId> org.apache.hbase</ groupId> < artifactId> hbase-client</ artifactId> < version> 2.5.10-hadoop3</ version>
</ dependency>
< dependency> < groupId> com.google.guava</ groupId> < artifactId> guava</ artifactId> < version> 33.2.1-jre</ version>
</ dependency>
RowKey注解
package cn. lhz. util. annotation ; import java. lang. annotation. Retention ;
import java. lang. annotation. RetentionPolicy ;
import java. lang. annotation. Target ;
import java. lang. annotation. ElementType ;
@Retention ( RetentionPolicy . RUNTIME)
@Target ( ElementType . FIELD)
public @interface RowKeyAnnotation {
}
hbase 自定义 过滤器
package cn. lhz. util. hbase ; import org. apache. hadoop. hbase. filter. FilterBase ;
import org. apache. hadoop. hbase. Cell ;
public class CustomFilter extends FilterBase { private final String targetValue; public CustomFilter ( String targetValue) { this . targetValue = targetValue; } @Override public ReturnCode filterCell ( Cell cell) { byte [ ] value = cell. getValueArray ( ) ; String cellValue = new String ( value, cell. getValueOffset ( ) , cell. getValueLength ( ) ) ; if ( cellValue. equals ( targetValue) ) { return ReturnCode . INCLUDE; } else { return ReturnCode . SKIP; } }
}
hbase 工具类
package cn. lhz. util. hbase ; import cn. lhz. util. annotation. RowKeyAnnotation ;
import org. apache. hadoop. conf. Configuration ;
import org. apache. hadoop. hbase. Cell ;
import org. apache. hadoop. hbase. CellUtil ;
import org. apache. hadoop. hbase. HBaseConfiguration ;
import org. apache. hadoop. hbase. TableName ;
import org. apache. hadoop. hbase. client. * ;
import org. apache. hadoop. hbase. filter. Filter ;
import org. apache. hadoop. hbase. util. Bytes ; import java. io. IOException ;
import java. lang. reflect. Field ;
import java. util. * ;
import java. util. regex. Pattern ;
public class HbaseUtil { private final static Configuration conf = HBaseConfiguration . create ( ) ; public static Connection getConnection ( ) throws IOException { return ConnectionFactory . createConnection ( conf) ; } public static Admin getAdmin ( ) throws IOException { return getConnection ( ) . getAdmin ( ) ; } public static Admin getAdmin ( Connection connection) throws IOException { return connection. getAdmin ( ) ; } public static void closeConnection ( Connection connection) throws IOException { if ( connection != null ) { connection. close ( ) ; } } public static void closeAdmin ( Admin admin) throws IOException { if ( admin != null ) { admin. close ( ) ; } } public static void closeAdminAndConnection ( Admin admin, Connection connection) throws IOException { closeAdmin ( admin) ; closeConnection ( connection) ; } public static void createTable ( Admin admin, String name, String . . . columnFamilyNames) throws IOException { TableName tableName = TableName . valueOf ( name) ; TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder . newBuilder ( tableName) ; Collection < ColumnFamilyDescriptor > collection = new ArrayList < > ( ) ; if ( columnFamilyNames != null ) { for ( String columnFamilyName : columnFamilyNames) { collection. add ( ColumnFamilyDescriptorBuilder . newBuilder ( Bytes . toBytes ( columnFamilyName) ) . build ( ) ) ; } } tableDescriptorBuilder. setColumnFamilies ( collection) ; TableDescriptor tableDescriptor = tableDescriptorBuilder. build ( ) ; admin. createTable ( tableDescriptor) ; } public static boolean deleteTable ( Admin admin, String name) throws IOException { TableName tableName = TableName . valueOf ( name) ; if ( admin. tableExists ( tableName) ) { admin. disableTable ( tableName) ; admin. deleteTable ( tableName) ; } return true ; } public static < E > boolean upsert ( Connection connection, String name, String columnFamilyName, E e) throws IOException , IllegalAccessException { TableName tableName = TableName . valueOf ( name) ; Table table = connection. getTable ( tableName) ; Class < ? > aClass = e. getClass ( ) ; Put put = null ; for ( Field field : aClass. getDeclaredFields ( ) ) { field. setAccessible ( true ) ; RowKeyAnnotation annotation = field. getAnnotation ( RowKeyAnnotation . class ) ; if ( annotation != null ) { put = new Put ( Bytes . toBytes ( String . valueOf ( field. get ( e) ) ) ) ; } else { if ( put == null ) { return false ; } put. addColumn ( Bytes . toBytes ( columnFamilyName) , Bytes . toBytes ( field. getName ( ) ) , Bytes . toBytes ( String . valueOf ( field. get ( e) ) ) ) ; } } if ( put != null ) { table. put ( put) ; return true ; } return false ; } public static boolean delete ( Connection connection, String name, String rowKey) throws IOException { TableName tableName = TableName . valueOf ( name) ; Table table = connection. getTable ( tableName) ; Delete delete = new Delete ( Bytes . toBytes ( rowKey) ) ; table. delete ( delete) ; return true ; } public static Set < String > columnFamilyNames ( Admin admin, String name) throws IOException { TableName tableName = TableName . valueOf ( name) ; TableDescriptor tableDescriptor = admin. getDescriptor ( tableName) ; Set < String > columnFamilyNames = new HashSet < > ( ) ; for ( byte [ ] columnFamilyName : tableDescriptor. getColumnFamilyNames ( ) ) { columnFamilyNames. add ( Bytes . toString ( columnFamilyName) ) ; } return columnFamilyNames; } public static List < String > columnNames ( Connection connection, String name) throws IOException { List < String > columnNames = new ArrayList < > ( ) ; TableName tableName = TableName . valueOf ( name) ; Table table = connection. getTable ( tableName) ; Admin admin = connection. getAdmin ( ) ; TableDescriptor tableDescriptor = admin. getDescriptor ( tableName) ; ColumnFamilyDescriptor [ ] columnFamilies = tableDescriptor. getColumnFamilies ( ) ; for ( ColumnFamilyDescriptor columnFamilyDescriptor : columnFamilies) { Result result = table. getScanner ( columnFamilyDescriptor. getName ( ) ) . next ( ) ; for ( Cell cell : result. rawCells ( ) ) { byte [ ] column = CellUtil . cloneQualifier ( cell) ; columnNames. add ( Bytes . toString ( column) ) ; } } return columnNames; } public static < T > Class < T > selectByRowKey ( Connection connection, String name, String rowKey, Class < T > clazz) throws IOException , NoSuchFieldException , IllegalAccessException { Admin admin = connection. getAdmin ( ) ; TableName tableName = TableName . valueOf ( name) ; Table table = connection. getTable ( tableName) ; Get get = new Get ( Bytes . toBytes ( rowKey) ) ; Result result = table. get ( get) ; TableDescriptor tableDescriptor = admin. getDescriptor ( tableName) ; ColumnFamilyDescriptor [ ] columnFamilies = tableDescriptor. getColumnFamilies ( ) ; for ( ColumnFamilyDescriptor columnFamilyDescriptor : columnFamilies) { List < String > columnNames = new ArrayList < > ( ) ; byte [ ] columnFamilyByteName = columnFamilyDescriptor. getName ( ) ; Result rs = table. getScanner ( columnFamilyDescriptor. getName ( ) ) . next ( ) ; for ( Cell cell : rs. rawCells ( ) ) { byte [ ] column = CellUtil . cloneQualifier ( cell) ; columnNames. add ( Bytes . toString ( column) ) ; } for ( String columnName : columnNames) { String value = Bytes . toString ( result. getValue ( columnFamilyByteName, Bytes . toBytes ( columnName) ) ) ; Field field = clazz. getDeclaredField ( nameInDb2nameInJava ( columnName) ) ; field. setAccessible ( true ) ; field. set ( clazz, value) ; } } return clazz; } public static < T > List < Class < T > > filter ( Connection connection, String name, String columnFamilyName, String columnName, String columnValue, Class < T > clazz, int cacheCount) throws IOException , NoSuchFieldException , IllegalAccessException { List < Class < T > > list = new ArrayList < > ( ) ; Table table = connection. getTable ( TableName . valueOf ( name) ) ; Scan scan = new Scan ( ) ; scan. setCaching ( cacheCount) ; Filter customFilter = new CustomFilter ( columnValue) ; scan. setFilter ( customFilter) ; try ( ResultScanner scanner = table. getScanner ( scan) ) { for ( Result result : scanner) { for ( Field field : clazz. getDeclaredFields ( ) ) { field. setAccessible ( true ) ; field. set ( clazz, null ) ; } List < String > columnNames = new ArrayList < > ( ) ; Result rs = table. getScanner ( Bytes . toBytes ( columnFamilyName) ) . next ( ) ; for ( Cell cell : rs. rawCells ( ) ) { byte [ ] column = CellUtil . cloneQualifier ( cell) ; columnNames. add ( Bytes . toString ( column) ) ; } for ( String columnTempName : columnNames) { String value = Bytes . toString ( result. getValue ( Bytes . toBytes ( columnTempName) , Bytes . toBytes ( columnName) ) ) ; Field field = clazz. getDeclaredField ( nameInDb2nameInJava ( columnName) ) ; field. setAccessible ( true ) ; field. set ( clazz, value) ; } list. add ( clazz) ; } } return list; } public static String nameInDb2nameInJava ( String filedName) { String coluname = filedName. toLowerCase ( ) ; if ( Pattern . compile ( "^\\S+_+\\S+$" ) . matcher ( coluname) . find ( ) ) { char [ ] ca = coluname. toCharArray ( ) ; for ( int j = 1 ; j < ca. length - 1 ; j++ ) { if ( ca[ j] == '_' ) { ca[ j] = '\0' ; ca[ j + 1 ] = Character . toUpperCase ( ca[ j + 1 ] ) ; } } coluname = new String ( ca) ; } return coluname. replaceAll ( "\0" , "" ) ; }
}