All checks were successful
Build / Build-and-ng-test (pull_request) Successful in 4m6s
Closes #124
1456 lines
46 KiB
SAS
Executable File
1456 lines
46 KiB
SAS
Executable File
/**
|
|
@file
|
|
@brief Routine supporting multiple load types
|
|
@details Generic loader for multiple load types (UPDATE, SCD2, BITEMPORAL).
|
|
|
|
Handles all elements including metadata validation, PK checking, closeouts,
|
|
locking, logging, etc.
|
|
|
|
The staging table must be prepared with a unique business key. For bitemporal
|
|
this means a snapshot at both technical AND business time.
|
|
|
|
ASSUMPTIONS:
|
|
- Base table has relevant datetime vars: 2xTechnical, 2xBusiness, 1xProcessed
|
|
- Staging table omits Technical or Processed datetimes (has Business only)
|
|
- Base table has no column names containing the string "___TMP___"
|
|
- Base &tech_from variable is not nullable. This should always be the case
|
|
anyway whenbuilding permanent bitemporal datasets.. But the point is that
|
|
this field is used to identify new records after the initial left join
|
|
from staging to base table.
|
|
|
|
NOTES:
|
|
- All queries against BiTemporal tables need two filter conditions, as such:
|
|
|
|
where &bus_from LE [tstamp] LT &bus_to
|
|
AND &tx_from LE [tstamp] LT &tx_to
|
|
|
|
One cannot use BETWEEN
|
|
One cannot use &xx_from LE [tstamp] LE &xx_from (equivalent to above).
|
|
Background:
|
|
http://stackoverflow.com/questions/20005950/best-practice-for-scd-date-pairs-closing-opening-timestamps
|
|
|
|
Areas for optimisation
|
|
- loading temporal history (currently experimental)
|
|
|
|
## Supporting tables
|
|
|
|
Supporting tables must exist in the library specified in the `dclib` param.
|
|
|
|
### MPE_DATALOADS
|
|
|
|
This table is updated every time a successful load occurs, and includes
|
|
information such as:
|
|
|
|
@li library
|
|
@li dataset
|
|
@li message (supplied in the ETLSOURCE param)
|
|
@li new rows
|
|
@li deleted rows
|
|
@li changed rows
|
|
@li timestamp
|
|
@li the user making the load
|
|
@li the version of (this) macro used to make the load
|
|
|
|
|
|
@param [in] APPEND_DSN= (APPENDTABLE) Name of STAGING table
|
|
@param [in] CONFIG_TABLE= (&dclib..MPE_CONFIG) The table containing library
|
|
engine specific config. The following scopes are supported:
|
|
@li DCBL_REDSH
|
|
@param [in] LOADTYPE= (BITEMPORAL) Supported types:
|
|
@li TXTEMPORAL - loads a buskey with version times
|
|
@li BUSTEMPORAL - loads buskey with bus + ver times
|
|
@li UPDATE - updates a buskey with NO history
|
|
@param [in] PROCESSED= (0) This column obtains a current timestamp for changed
|
|
records when loading the target table. Default is 0 (not set). If the
|
|
target table contains a variable called PROCESSED_DTTM, and processed=0,
|
|
then this column will be used for applying the current timestamp.
|
|
@param RK_MAXKEYTABLE= (mpe_maxkeyvalues) The maxkeytable to use (must exist
|
|
in DCLIB)
|
|
@param [in] PK= Business key, space separated. Should NOT include temporal
|
|
fields.
|
|
@param [in] RK_UNDERLYING= If supplied will generate an RK based on these
|
|
(space separated) business key fields. In this case only ONE PK field should
|
|
be supplied, which is assumed to be the RK. The RK field, plus underlying
|
|
fields, should all exist on the base table. The underlying fields should
|
|
exist on the staging table (the RK / PK field will be overwritten).
|
|
The staging table should also be unique on its PK.
|
|
|
|
@param [in] dclib= (&dc_libref) The library containing DC configuration tables
|
|
@param [out] outds_del= (work.outds_del) Output table containing
|
|
deleted records
|
|
@param [out] outds_add= (work.outds_add) Output table containing
|
|
appended records
|
|
@param [out] outds_mod= (work.outds_mod) Output table containing
|
|
changed records
|
|
@param [out] outds_audit= (0) Load detailed changes to an audit table. Uses
|
|
the mp_storediffs.sas macro. Provide the base table here, to load.
|
|
|
|
<h4> Global Variables </h4>
|
|
The following global macro variables are used. These should be replaced by
|
|
macro parameters in future releases.
|
|
|
|
@li `dc_dttmtfmt`
|
|
|
|
<h4> SAS Macros </h4>
|
|
@li bitemporal_closeouts.sas
|
|
@li dc_assignlib.sas
|
|
@li mf_existds.sas
|
|
@li mf_existvar.sas
|
|
@li mf_fmtdttm.sas
|
|
@li mf_getattrn.sas
|
|
@li mf_getengine.sas
|
|
@li mf_getschema.sas
|
|
@li mf_getuniquefileref.sas
|
|
@li mf_getuniquename.sas
|
|
@li mf_getuser.sas
|
|
@li mf_getvarlist.sas
|
|
@li mf_verifymacvars.sas
|
|
@li mf_wordsinstr1butnotstr2.sas
|
|
@li mp_abort.sas
|
|
@li mp_dropmembers.sas
|
|
@li mp_lockanytable.sas
|
|
@li mp_lockfilecheck.sas
|
|
@li mp_retainedkey.sas
|
|
@li mp_storediffs.sas
|
|
|
|
@version 9.3
|
|
@author 4GL Apps Ltd.
|
|
@copyright 4GL Apps Ltd. This code may only be used within Data Controller
|
|
and may not be re-distributed or re-sold without the express permission of
|
|
4GL Apps Ltd.
|
|
|
|
@warning multitemporal loads (bitemporal for multiple points in business time)
|
|
are in experimental stage
|
|
|
|
**/
|
|
|
|
%macro bitemporal_dataloader(
|
|
bus_from= /* Business FROM datetime variable. Req'd on
|
|
STAGING & BASE tables.*/
|
|
,bus_to = /* Business TO datetime variable. Req'd on
|
|
STAGING & BASE tables. */
|
|
,bus_from_override= /* Provide a hard coded BUS_FROM datetime value.*/
|
|
,bus_to_override= /* provide a hard coded BUS_TO datetime value */
|
|
,tech_from= /* Technical FROM datetime variable. Req'd on
|
|
BASE table only. */
|
|
,tech_to = /* Technical TO datetime variable. Req'd on BASE
|
|
table only. */
|
|
,processed= 0
|
|
,base_lib=WORK /* Libref of the BASE table. */
|
|
,base_dsn=BASETABLE /* Name of BASE table. */
|
|
,append_lib=WORK /* Libref of the STAGING table. */
|
|
,append_dsn=APPENDTABLE
|
|
,high_date='01JAN5999:00:00:00'dt /* High date to close out records */
|
|
,PK= name sex
|
|
,RK_UNDERLYING=
|
|
,KEEPVARS= /* Provides option for removing unwanted vars from append table */
|
|
,RK_UPDATE_MAXKEYTABLE=NO /* If switching (or mix matching) with regular
|
|
SCD2 loader then set this switch to YES to
|
|
ensure the MAXKEYTABLE is updated with the
|
|
current maximum RK value for the target table
|
|
*/
|
|
,CHECK_UNIQUENESS=YES /* Perform a check of the APPEND table to ensure it is
|
|
unique on its business key */
|
|
,ETLSOURCE=demo /* supply a value ($50.) to show as ETLSOURCE in
|
|
&dclib..DATALOADS */
|
|
,LOADTYPE=BITEMPORAL
|
|
,RK_MAXKEYTABLE= mpe_maxkeyvalues
|
|
,LOG=1 /* Switch to 0 to prevent records being added to
|
|
&mpelib..mpe_DATALOADS (ie when testing)*/
|
|
,DELETE_COL= _____DELETE__THIS__RECORD_____
|
|
/* If this variable is found in the append dataset
|
|
then records are closed out (or deleted) in the
|
|
append table where that variable= "Yes" */
|
|
,LOADTARGET=YES /* set to anything but uppercase YES to switch off
|
|
target table load and generate temp tables only */
|
|
,CLOSE_VARS=
|
|
/*a problem with regular SCD2 or TXTEMPORAL loads is that there is
|
|
no facility to close out removed records (all records are
|
|
assumed new or changed). But how does one determine which
|
|
records are removed? Short of loading the entire table
|
|
each time? This parameter allows a set of variables
|
|
(this should be a subset of the PK) to be declared, and
|
|
the macro will determine which records in the base table
|
|
need to be closed out ahead of the load.
|
|
|
|
For instance, given the following:
|
|
|
|
Base Table Staging Table
|
|
DATE ENTITY AMOUNT DATE ENTITY AMOUNT
|
|
JAN ACME4 66 JAN ACME4 66
|
|
FEB ACME4 99 FEB ACME4 99
|
|
FEB ACME1 22
|
|
|
|
By supplying DATE in CLOSE_VARS and DATE ENTITY as the PK,
|
|
the "FEB PAG 22" record would get closed out.
|
|
*/
|
|
,config_table=&dclib..MPE_CONFIG
|
|
,dclib=&dc_libref
|
|
,outds_del=work.outds_del
|
|
,outds_add=work.outds_add
|
|
,outds_mod=work.outds_mod
|
|
,outds_audit=0
|
|
);
|
|
|
|
/* when changing this macro, update the version num here */
|
|
%local ver;
|
|
%let ver=32;
|
|
%put &sysmacroname entry vars:;
|
|
%put _local_;
|
|
|
|
%dc_assignlib(WRITE,&base_lib) /* may not already be assigned */
|
|
|
|
/* return straight away if nothing to load */
|
|
%let nobs= %mf_getattrn(&append_lib..&append_dsn,NLOBS);
|
|
%if &nobs=-1 %then %do;
|
|
proc sql noprint; select count(*) into: nobs from &append_lib..&append_dsn;
|
|
%end;
|
|
%if &nobs=0 %then %do;
|
|
%put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
|
|
%put NOTE- Base dataset &append_lib..&append_dsn is empty. Nothing to upload!;
|
|
%put NOTE-;%put NOTE-;%put NOTE-;
|
|
%return;
|
|
%end;
|
|
|
|
/* hard exit if err condition exists */
|
|
%mp_abort(iftrue= (&syscc > 0)
|
|
,mac=bitemporal_dataloader
|
|
,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status;)
|
|
)
|
|
|
|
%local engine_type;
|
|
%let engine_type=%mf_getengine(&base_lib);
|
|
%if (&engine_type=REDSHIFT or &engine_type=POSTGRES) and %length(&CLOSE_VARS)>0
|
|
%then %do;
|
|
%put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
|
|
%put NOTE- CLOSE_VARS functionality not yet supported in &engine_type;
|
|
%put NOTE-;%put NOTE-;%put NOTE-;
|
|
%return;
|
|
%end;
|
|
|
|
/**
|
|
* The metadata functions (eg mf_existvar) will fail if the base table has a
|
|
* SAS lock. So, make a snapshot of the base table for further use.
|
|
* Also, make output tables (regardless).
|
|
*/
|
|
%local basecopy;
|
|
%let basecopy=%mf_getuniquename(prefix=basecopy);
|
|
|
|
data &basecopy &outds_mod &outds_add &outds_del;
|
|
set &base_lib..&base_dsn;
|
|
stop;
|
|
run;
|
|
%mp_abort(iftrue= (&syscc > 0)
|
|
,mac=&_program
|
|
,msg=%str(syscc=&syscc after base table copy - aborting due to table lock)
|
|
)
|
|
|
|
|
|
%local cols idx_pk md5_col ;
|
|
%let md5_col=___TMP___md5;
|
|
%let check_uniqueness=%upcase(&check_uniqueness);
|
|
%let RK_UPDATE_MAXKEYTABLE=%upcase(&RK_UPDATE_MAXKEYTABLE);
|
|
%let high_date=%unquote(&high_date);
|
|
%let loadtype=%upcase(&loadtype);
|
|
|
|
/* ensure irrelevant variables are cleared */
|
|
%if &loadtype=BUSTEMPORAL %then %do;
|
|
%let tech_from=;
|
|
%let tech_to=;
|
|
%end;
|
|
%else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
|
|
%let bus_from=;
|
|
%let bus_to=;
|
|
%end;
|
|
|
|
/* ensure relevant variables are supplied */
|
|
%mp_abort(iftrue=(&loadtype=BITEMPORAL & %mf_verifymacvars(bus_from bus_to)=0)
|
|
,mac=bitemporal_dataloader
|
|
,msg=%str(Missing BUS_FROM / BUS_TO)
|
|
)
|
|
%mp_abort(iftrue=(&loadtype=TXTEMPORAL & %mf_verifymacvars(tech_from tech_to)=0)
|
|
,mac=bitemporal_dataloader
|
|
,msg=%str(Missing TECH_FROM / TECH_TO)
|
|
)
|
|
|
|
/**
|
|
* drop any tables (may be defined as views or vice versa preventing overwrite)
|
|
*/
|
|
%mp_dropmembers(append bitemp0_append bitemp_cols)
|
|
|
|
/* SQL Server requires its own time values */
|
|
/* 9.2 will only give picture format down to seconds. 9.3 allows
|
|
milliseconds by using lower S and defining the decimal in the format name..*/
|
|
PROC FORMAT;
|
|
picture MyMSdt other='%0Y-%0m-%0dT%0H:%0M:%0S' (datatype=datetime);
|
|
RUN;
|
|
%local dbnow;
|
|
%let dbnow="%sysfunc(datetime(),%mf_fmtdttm())"dt;
|
|
|
|
data _null_;
|
|
/* convert space separated macvar to comma separated for SQL processing */
|
|
call symputx('PK_COMMA',tranwrd(compbl("&pk"),' ',','),'L');
|
|
call symputx('PK_CNT',countw("&pk",' '),'L');
|
|
now=&dbnow;
|
|
call symputx('NOW',now,'L');
|
|
call symputx('SQLNOW',cats("'",put(now,MyMSdt.),"'"),'L');
|
|
length etlsource $100;
|
|
etlsource=subpad(symget('etlsource'),1,100);
|
|
call symputx('etlsource',etlsource,'l');
|
|
run;
|
|
|
|
/**
|
|
* Even if no PROCESSED var provided, assume that any variable named
|
|
* PROCESSED_DTTM should be updated
|
|
*/
|
|
%if &processed=0 %then %do;
|
|
%if %mf_existvar(&basecopy,PROCESSED_DTTM)
|
|
%then %let processed=PROCESSED_DTTM;
|
|
%else %let processed=;
|
|
%end;
|
|
|
|
|
|
/* extract colnames for md5 creation / change tracking */
|
|
proc contents noprint data=&base_lib..&base_dsn
|
|
out=work.bitemp_cols (keep=name type length varnum format:);
|
|
run;
|
|
proc sql noprint;
|
|
select name into: cols separated by ','
|
|
from work.bitemp_cols
|
|
where upcase(name) not in
|
|
(%upcase("&bus_from","&bus_to"
|
|
,"&tech_from","&tech_to"
|
|
,"&processed","&delete_col")) ;
|
|
select case when type in (2,6) then cats('put(md5(trim(',name,')),$hex32.)')
|
|
/* multiply by 1 to strip precision errors (eg 0 != 0) */
|
|
/* but ONLY if not missing, else will lose any special missing values */
|
|
else cats('put(md5(trim(put(ifn(missing('
|
|
,name,'),',name,',',name,'*1),binary64.))),$hex32.)') end
|
|
into: stripcols separated by '||'
|
|
from work.bitemp_cols
|
|
where upcase(name) not in
|
|
(%upcase("&bus_from","&bus_to"
|
|
,"&tech_from","&tech_to"
|
|
,"&processed","&delete_col")) ;
|
|
|
|
/* set default formats*/
|
|
%let bus_from_fmt = datetime19.;
|
|
%let bus_to_fmt = datetime19.;
|
|
%let processed_fmt = datetime19.;
|
|
|
|
%let tech_from_fmt = format=datetime19.;
|
|
%let tech_to_fmt = format=datetime19.;
|
|
|
|
|
|
%put &=stripcols;
|
|
%put &=pk;
|
|
|
|
data _null_;
|
|
set work.bitemp_cols;
|
|
if type=2 or type=6 then do;
|
|
length fmt $49.;
|
|
if format='' then fmt=cats('$',length,'.');
|
|
else fmt=cats(format,formatl,'.');
|
|
end;
|
|
else do;
|
|
if format='' then fmt=cats(length,'.');
|
|
else fmt=cats(format,formatl,'.',formatd);
|
|
end;
|
|
if upcase(name)="%upcase(&bus_from)" then
|
|
call symputx('bus_from_fmt',fmt,'L');
|
|
else if upcase(name)="%upcase(&bus_to)" then
|
|
call symputx('bus_to_fmt',fmt,'L');
|
|
else if upcase(name)="%upcase(&tech_from)" then
|
|
call symputx('tech_from_fmt',"format="!!fmt,'L');
|
|
else if upcase(name)="%upcase(&tech_to)" then
|
|
call symputx('tech_to_fmt',"format="!!fmt,'L');
|
|
else if upcase(name)="%upcase(&processed)" then
|
|
call symputx('processed_fmt',fmt,'L');
|
|
run;
|
|
|
|
%if %index(%quote(&cols),___TMP___) %then %do;
|
|
%let msg=%str(Table contains a variable name containing "___TMP___".%trim(
|
|
) This may conflict with temp variable generation!!);
|
|
%mp_abort(msg=&msg,mac=bitemporal_dataloader);
|
|
%let syscc=5;
|
|
%return;
|
|
%end;
|
|
|
|
/* if transaction dates appear on the APPEND table, need to remove them */
|
|
%local drop_tx_dates /* used in append table */
|
|
drop_tx_dates_noobs /* used to take the base table structure */;
|
|
%if %mf_existvar(&append_lib..&append_dsn, &tech_from)
|
|
%then %let drop_tx_dates=&tech_from;
|
|
%if %mf_existvar(&append_lib..&append_dsn, &tech_to)
|
|
%then %let drop_tx_dates=&drop_tx_dates &tech_to;
|
|
%if %length(%trim(&drop_tx_dates))>0
|
|
%then %let drop_tx_dates=(drop=&drop_tx_dates);
|
|
|
|
%if %mf_existvar(&basecopy, &tech_from)
|
|
%then %let drop_tx_dates_noobs=&tech_from;
|
|
%if %mf_existvar(&basecopy, &tech_to)
|
|
%then %let drop_tx_dates_noobs=&drop_tx_dates_noobs &tech_to;
|
|
%if %length(%trim(&drop_tx_dates_noobs))>0
|
|
%then %let drop_tx_dates_noobs=(drop=&drop_tx_dates_noobs obs=0);
|
|
%else %let drop_tx_dates_noobs=(obs=0);
|
|
|
|
|
|
/**
|
|
* Lock the table. This is necessary as we are doing a two part update (first
|
|
* closing records then appending new records). It is theoretically possible
|
|
* that an upload may occur whilst preparing the staging tables. And the
|
|
* staging tables are about to be prepared..
|
|
*/
|
|
%if &LOADTARGET = YES %then %do;
|
|
%put locking &base_lib..&base_dsn;
|
|
%mp_lockanytable(LOCK,
|
|
lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,ctl_ds=&dclib..mpe_lockanytable
|
|
)
|
|
%if "&outds_audit" ne "0" %then %do;
|
|
%put locking &outds_audit;
|
|
%mp_lockanytable(LOCK
|
|
,lib=%scan(&outds_audit,1,.)
|
|
,ds=%scan(&outds_audit,2,.)
|
|
,ref=&ETLSOURCE
|
|
,ctl_ds=&dclib..mpe_lockanytable
|
|
)
|
|
%end;
|
|
%end;
|
|
%else %do;
|
|
/* not an actual load, so avoid updating the max key table in next step. */
|
|
%let rk_update_maxkeytable=NO;
|
|
%end;
|
|
|
|
%if %length(&RK_UNDERLYING)>0 %then %do;
|
|
%mp_retainedkey(
|
|
base_lib=&base_lib
|
|
,base_dsn=&base_dsn
|
|
,append_lib=&append_lib
|
|
,append_dsn=&append_dsn
|
|
,retained_key=&pk
|
|
,business_key=&rk_underlying
|
|
,check_uniqueness=&CHECK_UNIQUENESS
|
|
,outds=work.append
|
|
%if &rk_update_maxkeytable=NO %then %do;
|
|
,maxkeytable=0
|
|
%end;
|
|
%else %do;
|
|
,maxkeytable=&dclib..&RK_MAXKEYTABLE
|
|
%end;
|
|
,locktable=&dclib..mpe_lockanytable
|
|
%if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
|
|
,filter_str=%str( (where=( &now < &tech_to)) )
|
|
%end;
|
|
)
|
|
%end;
|
|
%else %do;
|
|
proc sql;
|
|
create view work.append as select * from &append_lib..&append_dsn;
|
|
%end;
|
|
/**
|
|
* generate md5 for append table
|
|
*/
|
|
/* it is possible the source dataset has additional (unwanted) columns.
|
|
Drop if specified; */
|
|
%if %length(&keepvars)>0 %then %do;
|
|
/* remove tech dates from keepvars as they are generated later */
|
|
%let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_from ),%str( )));
|
|
%let keepvars=%sysfunc(tranwrd(%str( &keepvars ),%str( &tech_to ),%str( )));
|
|
%let keepvars=(keep=&keepvars &bus_from &bus_to &processed &md5_col);
|
|
%end;
|
|
|
|
/* CAS varchar types cause append issues here, so perform autoconvert
|
|
by creating empty local table first */
|
|
data;
|
|
set &base_lib..&base_dsn &drop_tx_dates_noobs;
|
|
run;
|
|
%local emptybasetable; %let emptybasetable=&syslast;
|
|
|
|
data work.bitemp0_append &keepvars &outds_del(drop=&md5_col )
|
|
%if "%substr(&sysver,1,1)" ne "4" and "%substr(&sysver,1,1)" ne "5" %then %do;
|
|
/nonote2err
|
|
%end;
|
|
;
|
|
/* apply formats for bitemporal vars but not tx dates which are added later */
|
|
%if %length(&keepvars)>0 and &loadtype=BITEMPORAL %then %do;
|
|
format &bus_from &bus_from_fmt;
|
|
format &bus_to &bus_to_fmt;
|
|
%end;
|
|
set &emptybasetable /* base table reqd in case append has fewer cols */
|
|
work.append &drop_tx_dates;
|
|
%if %length(%str(&bus_from_override))>0 %then %do;
|
|
&bus_from= %unquote(&bus_from_override) ;
|
|
%end;
|
|
%if %length(%str(&bus_to_override))>0 %then %do;
|
|
&bus_to= %unquote(&bus_to_override) ;
|
|
%end;
|
|
length &md5_col $32;
|
|
&md5_col=put(md5(&stripcols),hex32.);
|
|
%if %length(&processed)>0 %then %do;
|
|
format &processed &processed_fmt;
|
|
&processed=&now;
|
|
%end;
|
|
|
|
/**
|
|
* If a delete column exists then create the delete dataset
|
|
*/
|
|
%if %mf_existvar(&append_lib..&append_dsn, &delete_col) %then %do;
|
|
drop &delete_col;
|
|
if upcase(&delete_col) = "YES" then output &outds_del ;
|
|
else output work.bitemp0_append ;
|
|
run;
|
|
|
|
%if %mf_getattrn(&outds_del,NLOBS)>0 %then %do;
|
|
%bitemporal_closeouts(
|
|
tech_from=&tech_from
|
|
,tech_to = &tech_to
|
|
,base_lib=&base_lib
|
|
,base_dsn=&base_dsn
|
|
,append_lib=work
|
|
,append_dsn=%scan(&outds_del,-1,.)
|
|
,PK=&bus_from &pk
|
|
,NOW=&dbnow
|
|
,loadtarget=&loadtarget
|
|
,loadtype=&loadtype
|
|
)
|
|
%end;
|
|
%end;
|
|
%else %do;
|
|
output work.bitemp0_append;
|
|
run;
|
|
%end;
|
|
|
|
%mp_abort(iftrue= (&syscc gt 0 at line 494)
|
|
,mac=&_program
|
|
,msg=%str(syscc=&syscc)
|
|
)
|
|
|
|
%if %length(&close_vars)>0 %then %do;
|
|
/**
|
|
* need to close out records that are not provided
|
|
*/
|
|
proc sql;
|
|
create table bitemp1_closevars1 as
|
|
select distinct a.%mf_getquotedstr(in_str=&pk,dlm=%str(,a.),quote=)
|
|
from &base_lib..&base_dsn a
|
|
inner join work.bitemp0_append b
|
|
on 1=1
|
|
/* join on closevars key */
|
|
%do idx_pk=1 %to %sysfunc(countw(&close_vars));
|
|
%let idx_val=%scan(&close_vars,&idx_pk);
|
|
and a.&idx_val=b.&idx_val
|
|
%end;
|
|
/* filter base on tech dates if necessary */
|
|
%if &loadtype=TXTEMPORAL %then %do;
|
|
where a.&tech_from <=&now and &now < a.&tech_to
|
|
%end;
|
|
;
|
|
create table bitemp1_closevars2 as
|
|
select distinct a.*
|
|
from bitemp1_closevars1 a
|
|
left join work.bitemp0_append b
|
|
on 1=1
|
|
/* join on primary key */
|
|
%do idx_pk=1 %to %sysfunc(countw(&pk));
|
|
%let idx_val=%scan(&pk,&idx_pk);
|
|
and a.&idx_val=b.&idx_val
|
|
%end;
|
|
/* identify removed records by null value in a field in PK but not close_vars
|
|
*/
|
|
where b.%scan(
|
|
%mf_wordsInStr1ButNotStr2(Str1=&pk,Str2=&close_vars),1,%str( )
|
|
) IS NULL
|
|
;
|
|
|
|
%if %mf_getattrn(bitemp1_closevars2,NLOBS)>0 %then %do;
|
|
%bitemporal_closeouts(
|
|
tech_from=&tech_from
|
|
,tech_to = &tech_to
|
|
,base_lib=&base_lib
|
|
,base_dsn=&base_dsn
|
|
,append_lib=work
|
|
,append_dsn=bitemp1_closevars2
|
|
,PK=&bus_from &pk
|
|
,NOW=&dbnow
|
|
,loadtarget=&loadtarget
|
|
,loadtype=&loadtype
|
|
)
|
|
%end;
|
|
%end;
|
|
|
|
/* return if nothing to load (was just deletes) */
|
|
%if %mf_getattrn(work.bitemp0_append,NLOBS)=0 %then %do;
|
|
%put NOTE:; %put NOTE-;%put NOTE-;%put NOTE-;
|
|
%put NOTE- No updates - just deletes!;
|
|
%put NOTE-;%put NOTE-;%put NOTE-;
|
|
%end;
|
|
|
|
|
|
/**
|
|
* If applying manual overrides to business dates, then the input table MUST
|
|
* be unique on the PK. Check, and if not - abort.
|
|
*/
|
|
%local msg;
|
|
%if %length(&bus_from_override.&bus_to_override)>0 or &CHECK_UNIQUENESS=YES
|
|
%then %do;
|
|
proc sort data=work.bitemp0_append out=work.bitemp0_check nodupkey;
|
|
by &pk;
|
|
run;
|
|
%if %mf_getattrn(work.bitemp0_check,NLOBS)
|
|
ne %mf_getattrn(work.bitemp0_append,NLOBS)
|
|
%then %do;
|
|
%let msg=INPUT table &append_lib..&append_dsn is not unique on PK (&pk);
|
|
%mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE (&msg),
|
|
ctl_ds=&dclib..mpe_lockanytable
|
|
)
|
|
%mp_lockanytable(UNLOCK
|
|
,lib=%scan(&outds_audit,1,.)
|
|
,ds=%scan(&outds_audit,2,.)
|
|
,ref=&ETLSOURCE
|
|
,ctl_ds=&dclib..mpe_lockanytable
|
|
)
|
|
%mp_abort(msg=&msg,mac=bitemporal_dataloader.sas);
|
|
%end;
|
|
%end;
|
|
|
|
|
|
/**
|
|
* extract from BASE table. Only want matching records, as could be very BIG.
|
|
* New records are subsequently identified via left join and test for nulls.
|
|
*/
|
|
%local temp_table temp_table2 base_table baselib_schema;
|
|
%put DCNOTE: Extracting matching observations from &base_lib..&base_dsn;
|
|
|
|
%if &engine_type=OLEDB %then %do;
|
|
%let temp_table=##%mf_getuniquefileref(prefix=BTMP)_&base_dsn;
|
|
%if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
|
|
%let base_table=(select * from [dbo].&base_dsn
|
|
where convert(datetime,&SQLNOW) < &tech_to );
|
|
%else %let base_table=[dbo].&base_dsn;
|
|
proc sql;
|
|
create table &base_lib.."&temp_table"n as
|
|
select * from work.bitemp0_append;
|
|
/* open up a connection for pass through SQL */
|
|
%dc_assignlib(WRITE,&base_lib,passthru=myAlias)
|
|
create table work.bitemp0_base as select * from connection to myAlias(
|
|
%end;
|
|
%else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
|
|
/* grab schema */
|
|
%let baselib_schema=%mf_getschema(&base_lib);
|
|
%if &baselib_schema.X ne X %then %let baselib_schema=&baselib_schema..;
|
|
|
|
/* grab redshift config */
|
|
%local redcnt; %let redcnt=0;
|
|
%if &engine_type=REDSHIFT %then %do;
|
|
data _null_;
|
|
set &config_table(where=(var_scope='DCBL_REDSH' and var_active=1));
|
|
x+1;
|
|
call symputx(cats('rednm',x),var_value,'l');
|
|
call symputx(cats('redval',x),var_value,'l');
|
|
call symputx('redcnt',x,'l');
|
|
run;
|
|
%end;
|
|
/* cannot persist temp tables so must create a temporary permanent table */
|
|
%let temp_table=%mf_getuniquename(prefix=XDCTEMP);
|
|
%if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
|
|
%let base_table=(select * from &baselib_schema.&base_dsn
|
|
where timestamp &sqlnow < &tech_to );
|
|
%else %let base_table=&baselib_schema.&base_dsn;
|
|
/* make empty table first - must clone & drop extra cols as autoload is bad */
|
|
%dc_assignlib(WRITE,&base_lib,passthru=myAlias)
|
|
|
|
exec (create table &temp_table (like &baselib_schema.&base_dsn)) by myAlias;
|
|
%if &engine_type=REDSHIFT %then %do;
|
|
exec (alter table &temp_table alter sortkey none) by myAlias;
|
|
%end;
|
|
%local dropcols;
|
|
%let dropcols=%mf_wordsinstr1butnotstr2(
|
|
str1=%upcase(%mf_getvarlist(&basecopy))
|
|
,str2=%upcase(&pk)
|
|
);
|
|
%if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
|
|
%put &=dropcols;
|
|
%let idx_val=%scan(&dropcols,&idx_pk);
|
|
exec(alter table &temp_table drop column &idx_val;) by myAlias;
|
|
%end;
|
|
exec (alter table &temp_table add column &md5_col varchar(32);) by myAlias;
|
|
/* create view to strip formats and avoid warns in log */
|
|
data work.vw_bitemp0/view=work.vw_bitemp0;
|
|
set work.bitemp0_append(keep=&pk &md5_col);
|
|
format _all_;
|
|
run;
|
|
proc append base=&base_lib..&temp_table
|
|
%if &engine_type=REDSHIFT %then %do;
|
|
(
|
|
%do idx_pk=1 %to &redcnt;
|
|
&&rednm&idx_pk = &&redval&idxpk
|
|
%end;
|
|
)
|
|
%end;
|
|
data=work.vw_bitemp0 force nowarn;
|
|
run;
|
|
/* open up a connection for pass through SQL */
|
|
%dc_assignlib(WRITE,&base_lib,passthru=myAlias)
|
|
create table work.bitemp0_base as select * from connection to myAlias(
|
|
%end;
|
|
%else %if &engine_type=CAS %then %do;
|
|
%if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
|
|
%let base_table=&base_lib..&base_dsn
|
|
(where=(&tech_from <=&now and &now < &tech_to));
|
|
%else %let base_table=&base_lib..&base_dsn;
|
|
%let temp_table=CASUSER.%mf_getuniquename(prefix=DC);
|
|
data &temp_table;
|
|
set work.bitemp0_append;
|
|
run;
|
|
%let bitemp0base=CASUSER.%mf_getuniquename(prefix=DC);
|
|
proc fedsql sessref=dcsession;
|
|
create table &bitemp0base{options replace=true} as
|
|
%end;
|
|
%else %do;
|
|
%let temp_table=work.bitemp0_append;
|
|
%if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then
|
|
%let base_table=&base_lib..&base_dsn
|
|
(where=(&tech_from <=&now and &now < &tech_to));
|
|
%else %let base_table=&base_lib..&base_dsn;
|
|
proc sql;
|
|
create table work.bitemp0_base as
|
|
%end;
|
|
|
|
select a.&md5_col /* this identifies NEW records */
|
|
, b.*
|
|
/* assume first PK field cannot be null (if defined in a PK constraint then
|
|
it definitely cannot be null) */
|
|
, case when b.%scan(&pk,1) IS NULL then 1 else 0 end as ___TMP___NEW_FLG
|
|
from &baselib_schema.&temp_table a
|
|
left join &base_table b
|
|
on 1=1
|
|
%do idx_pk=1 %to &pk_cnt;
|
|
%let idx_val=%scan(&pk,&idx_pk);
|
|
and a.&idx_val=b.&idx_val
|
|
%end;
|
|
|
|
|
|
%if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
|
|
%then %do;
|
|
); proc sql; drop table &base_lib.."&temp_table"n;
|
|
%end;
|
|
%else %if &engine_type=CAS %then %do;
|
|
;
|
|
quit;
|
|
data work.bitemp0_base;
|
|
set &bitemp0base;
|
|
run;
|
|
proc sql;
|
|
drop table &temp_table;
|
|
drop table &bitemp0base;
|
|
%end;
|
|
%else %do;
|
|
;
|
|
%end;
|
|
|
|
/**
|
|
* matching & changed records are those without NULL key values
|
|
* &idx_val resolves to rightmost PK value (loop above)
|
|
*/
|
|
%put syscc (line525)=&syscc, sqlrc=&sqlrc;
|
|
%mp_abort(iftrue= (&syscc gt 0 or &sqlrc>0)
|
|
,mac=&_program
|
|
,msg=%str(syscc=&syscc sqlrc=&sqlrc)
|
|
)
|
|
|
|
%put hashcols2=&stripcols;
|
|
proc sql;
|
|
create table work.bitemp1_current(drop=___TMP___NEW_FLG) as
|
|
select *
|
|
, put(md5(&stripcols),$hex32.) as &md5_col
|
|
from work.bitemp0_base (drop=&md5_col)
|
|
where ___TMP___NEW_FLG=0;
|
|
|
|
/**
|
|
* NEW records were identified in ___TMP___NEW_FLG in bitemp0_base
|
|
*/
|
|
proc sql;
|
|
create table &outds_add
|
|
(drop=&md5_col
|
|
%if %mf_existvar(work.bitemp0_base, &delete_col) %then %do;
|
|
&delete_col
|
|
%end;
|
|
)
|
|
as select a.*
|
|
%if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
|
|
,&now as &tech_from &tech_from_fmt
|
|
,&high_date as &tech_to &tech_to_fmt
|
|
%end;
|
|
from work.bitemp0_append a /* STAGING records (mix of existing & new) */
|
|
, work.bitemp0_base b /* BASE records (contains null values for new) */
|
|
where a.&md5_col=b.&md5_col /* took staging md5 across in left join */
|
|
and b.___TMP___NEW_FLG=1; /* NEW records also identified in bitemp0_base */
|
|
|
|
|
|
/**
|
|
* identify INSERTS. These are records with the same business key but
|
|
* the bus_from and bus_to value are higher / lower (respectively)
|
|
* such that the existing record needs to be SPLIT to surround the new
|
|
* record.
|
|
* eg: OLD RECORD from=1 to=10
|
|
* NEW RECORD from=5 to=7
|
|
*
|
|
* APPENDED RECORDS:
|
|
* - from=1 to=5
|
|
* - from=5 to=7
|
|
* - from=7 to=10
|
|
*/
|
|
|
|
/* inserts cannot happen with TXTEMPORAL */
|
|
%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
|
|
/* IDENTIFY */
|
|
create table work.bitemp3_inserts as
|
|
select b.*
|
|
,a.&bus_from as ___TMP___from
|
|
,a.&bus_to as ___TMP___to
|
|
from work.bitemp0_append a
|
|
,work.bitemp1_current b
|
|
where a.&bus_from > b.&bus_from
|
|
and a.&bus_to < b.&bus_to
|
|
%do idx_pk=1 %to &pk_cnt;
|
|
%let idx_val=%scan(&pk,&idx_pk);
|
|
and a.&idx_val=b.&idx_val
|
|
%end;
|
|
order by
|
|
/* compress blanks and then insert commas (as the datetime fields may
|
|
not be in use) */
|
|
%sysfunc(tranwrd(%sysfunc(compbl(
|
|
&pk &bus_from &bus_to &processed
|
|
)),%str( ), %str(,)))
|
|
;
|
|
|
|
/* SPLIT */
|
|
data work.bitemp3a_inserts (drop=___TMP___from ___TMP___retain ___TMP___to) ;
|
|
set work.bitemp3_inserts;
|
|
by &pk &bus_from &bus_to &processed;
|
|
if first.&idx_val then do;
|
|
___TMP___retain=&bus_to;
|
|
&bus_to=___TMP___from;
|
|
output;
|
|
&bus_to=___TMP___retain;
|
|
end;
|
|
if last.&idx_val then do;
|
|
&bus_from=___TMP___to;
|
|
output;
|
|
end;
|
|
run;
|
|
%end;
|
|
%else %do;
|
|
/* TX temporal load */
|
|
data work.bitemp3a_inserts;
|
|
set work.bitemp1_current;
|
|
stop;
|
|
run;
|
|
%end;
|
|
/* APPEND */
|
|
proc sql;
|
|
create view work.bitemp3a_view as
|
|
select * from work.bitemp1_current
|
|
where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
|
|
|
|
data bitemp3b_newbase;
|
|
set work.bitemp3a_inserts work.bitemp3a_view;
|
|
run;
|
|
|
|
/** do not use! this converts short numerics into 8 bytes
|
|
proc sql;
|
|
create table work.bitemp3b_newbase as
|
|
select * from work.bitemp3a_inserts
|
|
union corr
|
|
select * from work.bitemp1_current
|
|
where &md5_col not in (select &md5_col from work.bitemp3a_inserts);
|
|
*/
|
|
|
|
/**
|
|
* identify CHANGED records from staging.
|
|
* Same business key with different temporal dates or md5 value
|
|
* This table must be overlayed onto / into existing business history
|
|
*/
|
|
proc sql;
|
|
create table work.bitemp4_updated as select distinct a.*
|
|
from work.bitemp0_append a
|
|
,work.bitemp3b_newbase b
|
|
where 1=1
|
|
%do idx_pk=1 %to &pk_cnt;
|
|
%let idx_val=%scan(&pk,&idx_pk);
|
|
and a.&idx_val=b.&idx_val
|
|
%end;
|
|
and ( a.&md5_col ne b.&md5_col
|
|
%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
|
|
OR (a.&bus_from ne b.&bus_from or a.&bus_to ne b.&bus_to)
|
|
%end;
|
|
)
|
|
;
|
|
|
|
/**
|
|
* This section would have been one simple step with union all
|
|
* but that converts short numerics into 8 bytes!
|
|
* so, convoluted alternative to retain the same functionality.
|
|
*/
|
|
|
|
/* base records */
|
|
create view work.bitemp4_prep1 as
|
|
select 'BASE' as ___TMP___
|
|
,b.*
|
|
from work.bitemp4_updated a
|
|
,work.bitemp3b_newbase b
|
|
where 1
|
|
%do idx_pk=1 %to &pk_cnt;
|
|
%let idx_val=%scan(&pk,&idx_pk);
|
|
and a.&idx_val=b.&idx_val
|
|
%end;
|
|
;
|
|
/* updated records */
|
|
create view work.bitemp4_prep2 as
|
|
select 'STAG' as ___TMP___ ,*
|
|
from work.bitemp4_updated;
|
|
/* ensure we only keep columns that appear in both */
|
|
%local bp1 bp2 bp3 bp4;
|
|
%let bp1=%mf_getvarlist(bitemp4_prep1);
|
|
%let bp2=%mf_getvarlist(bitemp4_prep2);
|
|
%let bp3=%mf_wordsInStr1ButNotStr2(Str1=&bp1,Str2=&bp2);
|
|
%let bp4=%mf_wordsInStr1ButNotStr2(Str1=&bp2,Str2=&bp1);
|
|
data work.bitemp4_prep3/view=bitemp4_prep3;
|
|
set bitemp4_prep1 bitemp4_prep2;
|
|
%if %length(XX&bp3&bp4)>2 %then %do;
|
|
drop &bp3 &bp4 ;
|
|
%end;
|
|
run;
|
|
/* remove duplicates */
|
|
proc sql;
|
|
create table work.bitemp4a_allrecs as
|
|
select distinct *
|
|
from work.bitemp4_prep3
|
|
order by
|
|
/* compress blanks and then insert commas (as the datetime fields
|
|
may not be in use) */
|
|
%sysfunc(tranwrd(%sysfunc(compbl(
|
|
&pk &bus_from &bus_to &processed
|
|
)),%str( ), %str(,)))
|
|
;
|
|
|
|
%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
|
|
/* this section aligns the business dates
|
|
(eg for inserts or overlaps in the range) */
|
|
data work.bitemp4b_firstpass (drop=___TMP___cond ___TMP___from ___TMP___to );
|
|
set work.bitemp4a_allrecs;
|
|
by &pk &bus_from &bus_to &processed;
|
|
retain ___TMP___cond 'Name of Condition';
|
|
retain ___TMP___from ___TMP___to 0;
|
|
___TMP___md5lag=lag(&md5_col);
|
|
/* reset retained variables */
|
|
if first.&idx_val then do;
|
|
call missing (___TMP___cond, ___TMP___from, ___TMP___to,___TMP___md5lag);
|
|
end;
|
|
else do;
|
|
/* if record is identical, carry forward bus_from (and bus_to if higher)*/
|
|
if &md5_col=___TMP___md5lag then do;
|
|
&bus_from=___TMP___from;
|
|
if &bus_to<___TMP___to then &bus_to=___TMP___to;
|
|
end;
|
|
end;
|
|
|
|
if ___TMP___='STAG' then do;
|
|
/* need to carry forward the closing record */
|
|
___TMP___cond='Condition 1';
|
|
end;
|
|
else if ___TMP___cond='Condition 1' then do;
|
|
/* else ensure bus_from starts from prior record bus_to */
|
|
if &md5_col ne ___TMP___md5lag and &bus_from <= ___TMP___to
|
|
then &bus_from= ___TMP___to;
|
|
/* new record may replace old record entirely */
|
|
if &bus_to <= &bus_from then delete;
|
|
else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
|
|
end;
|
|
___TMP___from=&bus_from;
|
|
___TMP___to=&bus_to;
|
|
run;
|
|
%end;
|
|
%else %do;
|
|
/* keep staged records only */
|
|
data work.bitemp4b_firstpass;
|
|
set work.bitemp4a_allrecs;
|
|
if ___TMP___='STAG';
|
|
run;
|
|
%end;
|
|
|
|
/* next phase is to pass through in reverse - so set up the sort statement */
|
|
%local byvar;
|
|
%do idx_pk=1 %to &pk_cnt;
|
|
%let byvar=&byvar descending %scan(&pk,&idx_pk);
|
|
%end;
|
|
%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL
|
|
%then %let byvar=&byvar descending &bus_from descending &bus_to;
|
|
/* if matching bus dates supplied, need to ensure we also have a sort
|
|
between BASE and STAGING tables */
|
|
%let byvar=&byvar descending ___TMP___;
|
|
|
|
proc sort data=work.bitemp4b_firstpass out=work.bitemp4c_sort ;
|
|
by &byvar;
|
|
run;
|
|
|
|
/**
|
|
* Now (in reverse) pass back business start dates
|
|
*/
|
|
data work.bitemp4d_secondpass;
|
|
%if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
|
|
&tech_from=&now;
|
|
&tech_to=&high_date;
|
|
%end;
|
|
set work.bitemp4c_sort ;
|
|
by &byvar;
|
|
retain ___TMP___cond 'Name of Condition';
|
|
retain ___TMP___from ___TMP___to 0;
|
|
%if &loadtype=BITEMPORAL or &loadtype=BUSTEMPORAL %then %do;
|
|
/* put / _all_ /;*/
|
|
___TMP___md5lag=lag(&md5_col);
|
|
if first.&idx_val then do;
|
|
/* reset retained variables */
|
|
call missing (___TMP___cond,___TMP___from,___TMP___to,___TMP___md5lag);
|
|
end;
|
|
else do;
|
|
/* if record is identical, carry back bus_to */
|
|
if &md5_col=___TMP___md5lag then &bus_to=___TMP___to;
|
|
end;
|
|
|
|
if ___TMP___='STAG' then do;
|
|
/* need to carry forward the closing record */
|
|
___TMP___cond='Condition 2';
|
|
end;
|
|
else if ___TMP___cond='Condition 2' then do;
|
|
/* else ensure bus_to stops at subsequent record bus_from */
|
|
if &md5_col ne ___TMP___md5lag and &bus_to >= ___TMP___from
|
|
then &bus_to= ___TMP___from;
|
|
/* new record may replace old record entirely */
|
|
if &bus_from >= &bus_to then delete;
|
|
if &bus_from=___TMP___from and &bus_to=___TMP___to then delete;
|
|
else call missing (___TMP___cond, ___TMP___from, ___TMP___to);
|
|
end;
|
|
___TMP___from=&bus_from;
|
|
___TMP___to=&bus_to;
|
|
|
|
%end;
|
|
run;
|
|
%put syscc (line600)=&syscc;
|
|
/**
|
|
There may still be some records (eg old business history) which have not
|
|
changed.
|
|
Need to identify these and remove from the append so they are not updated
|
|
unnecessarily. This is done by generating a new md5 (which INCLUDES the
|
|
business key) and any matching / identical records are split out (from those
|
|
that need to be updated).
|
|
*/
|
|
|
|
%if &loadtype=BITEMPORAL %then %do;
|
|
%let cat_string=catx('|' ,&bus_from,&bus_to);
|
|
|
|
data work.bitemp5a_lkp (keep=&md5_col)
|
|
%if "%substr(&sysver,1,1)" ne "4" & "%substr(&sysver,1,1)" ne "5" %then %do;
|
|
/nonote2err
|
|
%end;
|
|
;
|
|
set work.bitemp0_base;
|
|
/* for BITEMPORAL we need to compare business dates also */
|
|
&md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.);
|
|
run;
|
|
|
|
data bitemp5b_updates;
|
|
set bitemp4d_secondpass;
|
|
if _n_=1 then do;
|
|
dcl hash md5_lkp(dataset:'bitemp5a_lkp');
|
|
md5_lkp.definekey("&md5_col");
|
|
md5_lkp.definedone();
|
|
end;
|
|
/* drop old md5 col as will rebuild with new business dates */
|
|
&md5_col=put(md5(&cat_string!!'|'!!&stripcols),$hex32.) ;
|
|
if md5_lkp.check()=0 then delete;
|
|
run;
|
|
|
|
proc sql;
|
|
/* get min bus from as will update (close out) all records from this point
|
|
(for that PK)*/
|
|
create table work.bitemp5d_subquery as
|
|
select &pk_comma, min(&bus_from)as &bus_from, max(&bus_to) as &bus_to
|
|
from work.bitemp5b_updates
|
|
group by &pk_comma;
|
|
/* index has a huge efficiency impact on upcoming nested subquery */
|
|
create index index1 on work.bitemp5d_subquery(&pk_comma,&bus_from, &bus_to);
|
|
|
|
%let lastds=work.bitemp5b_updates;
|
|
%end;
|
|
%else %if &loadtype=TXTEMPORAL or &loadtype=UPDATE %then %do;
|
|
proc sql;
|
|
create table work.bitemp5d_subquery as
|
|
select distinct &pk_comma
|
|
from bitemp4d_secondpass;
|
|
%let lastds=work.bitemp4d_secondpass;
|
|
%end;
|
|
%else %let lastds=work.bitemp4d_secondpass;
|
|
|
|
/* create single append table (an overlapped pre-sert may be classed as
|
|
both an update AND a new record). Also create temp views that may be
|
|
used for pre-load analysis. */
|
|
data &outds_mod;
|
|
set &lastds(drop=___TMP___: &md5_col);
|
|
run;
|
|
|
|
data bitemp6_allrecs / view=bitemp6_allrecs;
|
|
set &outds_mod /* UPDATED records */
|
|
&outds_add /* NEW records */;
|
|
run;
|
|
|
|
proc sort data=work.bitemp6_allrecs
|
|
out=work.bitemp6_unique
|
|
noduprec
|
|
dupout=work.xx_BADBADBAD;
|
|
by _all_;
|
|
run;
|
|
|
|
/* we have all our temp tables now so exit if this is all that is needed */
|
|
%if &LOADTARGET ne YES %then %return;
|
|
|
|
/* also exit if an err condition exists */
|
|
|
|
%if &syscc>0 %then %do;
|
|
%put syscc=&syscc;
|
|
%mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
|
|
ctl_ds=&dclib..mpe_lockanytable
|
|
)
|
|
%if "&outds_audit" ne "0" %then %do;
|
|
%mp_lockanytable(UNLOCK
|
|
,lib=%scan(&outds_audit,1,.)
|
|
,ds=%scan(&outds_audit,2,.)
|
|
,ref=&ETLSOURCE
|
|
,ctl_ds=&dclib..mpe_lockanytable
|
|
)
|
|
%end;
|
|
%end;
|
|
%mp_abort(iftrue= (&syscc>0)
|
|
,mac=&sysmacroname in &_program
|
|
,msg=%str(Bitemporal transform / job aborted due to SYSCC=&SYSCC status)
|
|
)
|
|
|
|
/* final check - abort if a lock has appeared on the target or audit table */
|
|
%mp_lockfilecheck(libds=&base_lib..&base_dsn)
|
|
%if %mf_existds(&outds_audit) %then %do;
|
|
%mp_lockfilecheck(libds=&outds_audit)
|
|
%end;
|
|
|
|
/**
|
|
* STAGING TABLES PREPARED, ERR CONDITION TESTED FOR.. NOW TO LOAD!!
|
|
*/
|
|
|
|
/**
|
|
* First, CLOSE OUT changed records (if not a REPLACE)
|
|
* Note that SAS does not support ANSI standard for UPDATE with a join condition.
|
|
* However - this can be worked around using a nested subquery..
|
|
*/
|
|
data _null_;
|
|
putlog "&sysmacroname: CLOSEOUTS commencing";
|
|
run;
|
|
|
|
%if %mf_getattrn(&lastds,NLOBS)=0 %then %do;
|
|
data _null_;
|
|
putlog "&sysmacroname: No closeouts needed";
|
|
run;
|
|
%end;
|
|
%else %if &engine_type=CAS %then %do;
|
|
%mp_abort(iftrue= (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL)
|
|
,mac=&sysmacroname in &_program
|
|
,msg=%str(&loadtype not yet supported in CAS engine)
|
|
)
|
|
/* create temp table for deletions */
|
|
%local delds;%let delds=%mf_getuniquename(prefix=DC);
|
|
data casuser.&delds;
|
|
set work.bitemp5d_subquery;
|
|
run;
|
|
/* delete the records */
|
|
proc cas ;
|
|
table.deleteRows / table={
|
|
caslib="&base_lib",
|
|
name="&base_dsn",
|
|
where="1=1",
|
|
whereTable={caslib='CASUSER',name="&delds"}
|
|
};
|
|
quit;
|
|
/* drop temp table */
|
|
proc sql;
|
|
drop table CASUSER.&delds;
|
|
%end;
|
|
%else %if (&loadtype=BITEMPORAL or &loadtype=TXTEMPORAL or &loadtype=UPDATE)
|
|
%then %do;
|
|
data _null_;
|
|
putlog "&sysmacroname: &loadtype operation using &engine_type engine";
|
|
run;
|
|
%local flexinow;
|
|
proc sql;
|
|
/* if OLEDB then create a temp table for efficiency */
|
|
%local innertable;
|
|
%if &engine_type=OLEDB %then %do;
|
|
%let innertable=[&temp_table];
|
|
%let top_table=[dbo].&base_dsn;
|
|
%let flexinow=&SQLNOW;
|
|
create table &base_lib.."&temp_table"n as
|
|
select * from work.bitemp5d_subquery;
|
|
/* open up a connection for pass through SQL */
|
|
%dc_assignlib(WRITE,&base_lib,passthru=myAlias)
|
|
execute(
|
|
%end;
|
|
%else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
|
|
%let innertable=%mf_getuniquename(prefix=XDCTEMP);
|
|
%let top_table=&baselib_schema.&base_dsn;
|
|
%let flexinow=timestamp &SQLNOW;
|
|
/* make empty table first - must clone & drop extra cols
|
|
as autoload is bad */
|
|
%dc_assignlib(WRITE,&base_lib,passthru=myAlias)
|
|
exec (create table &innertable (like &baselib_schema.&base_dsn)) by myAlias;
|
|
%if &engine_type=REDSHIFT %then %do;
|
|
exec (alter table &innertable alter sortkey none) by myAlias;
|
|
%end;
|
|
%let dropcols=%mf_wordsinstr1butnotstr2(
|
|
str1=%upcase(%mf_getvarlist(&basecopy))
|
|
,str2=%upcase(%mf_getvarlist(work.bitemp5d_subquery))
|
|
);
|
|
%if %length(&dropcols>0) %then %do idx_pk=1 %to %sysfunc(countw(&dropcols));
|
|
%put &=dropcols;
|
|
%let idx_val=%scan(&dropcols,&idx_pk);
|
|
exec(alter table &innertable drop column &idx_val;) by myAlias;;
|
|
%end;
|
|
/* create view to strip formats and avoid warns in log */
|
|
data work.vw_bitemp5d/view=work.vw_bitemp5d;
|
|
set work.bitemp5d_subquery;
|
|
format _all_;
|
|
run;
|
|
proc append base=&base_lib..&innertable (
|
|
%do idx_pk=1 %to &redcnt;
|
|
&&rednm&idx_pk = &&redval&idxpk
|
|
%end;
|
|
)
|
|
data=work.vw_bitemp5d force nowarn;
|
|
run;
|
|
/* open up a connection for pass through SQL */
|
|
%dc_assignlib(WRITE,&base_lib,passthru=myAlias)
|
|
execute(
|
|
%end;
|
|
%else %do;
|
|
%let innertable=bitemp5d_subquery;
|
|
%let top_table=&base_lib..&base_dsn;
|
|
%let flexinow=&now;
|
|
%end;
|
|
|
|
|
|
%if &loadtype=BITEMPORAL or &loadtype=TXTEMPORAL %then %do;
|
|
update &top_table set &tech_to=&flexinow
|
|
%if %length(&processed)>0 %then %do;
|
|
,&processed=&flexinow
|
|
%end;
|
|
where &tech_from <= &flexinow and &flexinow < &tech_to and
|
|
%end;
|
|
%else %if &loadtype=UPDATE %then %do;
|
|
/* changed records are deleted then re-appended when doing UPDATEs */
|
|
delete from &top_table where
|
|
%end;
|
|
%else %do;
|
|
%put %str(ERR)OR: BUSTEMPORAL NOT YET SUPPORTED;
|
|
%let syscc=5;
|
|
%mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
|
|
ctl_ds=&dclib..mpe_lockanytable
|
|
)
|
|
%mp_lockanytable(UNLOCK
|
|
,lib=%scan(&outds_audit,1,.)
|
|
,ds=%scan(&outds_audit,2,.)
|
|
,ref=&ETLSOURCE
|
|
,ctl_ds=&dclib..mpe_lockanytable
|
|
)
|
|
%goto end_of_macro;
|
|
%end;
|
|
|
|
/* perform join inside query as per
|
|
http://stackoverflow.com/questions/24629793/update-with-a-proc-sql */
|
|
|
|
exists( select 1 from &baselib_schema.&innertable where
|
|
|
|
/* loop PK join */
|
|
%do idx_pk=1 %to &pk_cnt;
|
|
%let idx_val=%scan(&pk,&idx_pk);
|
|
&base_dsn..&idx_val=&innertable..&idx_val and
|
|
%end;
|
|
%if &loadtype=BITEMPORAL %then %do;
|
|
&base_dsn..&bus_from >= &innertable..&bus_from
|
|
and &base_dsn..&bus_to <= &innertable..&bus_to and
|
|
%end;
|
|
|
|
/* close the statement */
|
|
|
|
1=1);
|
|
|
|
%if &engine_type=OLEDB or &engine_type=REDSHIFT or &engine_type=POSTGRES
|
|
%then %do;
|
|
) by myAlias;
|
|
execute (drop table &baselib_schema.&innertable) by myAlias;
|
|
%end;
|
|
%end;
|
|
quit;
|
|
data _null_;
|
|
putlog "&sysmacroname: Closeout complete";
|
|
run;
|
|
/**
|
|
* Append the new / updated records
|
|
*/
|
|
%if &engine_type=CAS %then %do;
|
|
|
|
/* get varchar variables ready for casting */
|
|
%local vcfmt vcrename vcassign vcdrop;
|
|
data _null_;
|
|
set work.bitemp_cols(where=(type=6)) end=last;
|
|
length vcrename vcassign vcdrop vcfmt $32767 rancol $32;
|
|
retain vcrename vcassign vcdrop vcfmt;
|
|
if _n_=1 then vcrename='(rename=(';
|
|
rancol=resolve('%mf_getuniquename()');
|
|
vcfmt=trim(vcfmt)!!'length '!!cats(name)!!' varchar(*);';
|
|
vcrename=trim(vcrename)!!' '!!cats(name,'=',rancol);
|
|
vcassign=cats(vcassign,name,'=',rancol,';');
|
|
vcdrop=cats(vcdrop,'drop '!!rancol,';');
|
|
if last then do;
|
|
vcrename=cats(vcrename,'))');
|
|
call symputx('vcfmt',vcfmt);
|
|
call symputx('vcrename',vcrename);
|
|
call symputx('vcassign',vcassign);
|
|
call symputx('vcdrop',vcdrop);
|
|
end;
|
|
run;
|
|
|
|
/* prepare a temp cas table with varchars casted */
|
|
%let tmp=%mf_getuniquename();
|
|
data casuser.&tmp ;
|
|
&vcfmt
|
|
set work.bitemp6_unique &vcrename;
|
|
&vcassign
|
|
&vcdrop
|
|
run;
|
|
|
|
/* load the table with varchars applied*/
|
|
data &base_lib..&base_dsn (append=yes )/sessref=dcsession ;
|
|
set casuser.&tmp;
|
|
run;
|
|
|
|
/* drop temp table */
|
|
proc sql;
|
|
drop table CASUSER.&tmp;
|
|
|
|
/* this code will not work as regular tables do not have varchars */
|
|
/*
|
|
proc casutil;
|
|
load data=work.bitemp6_unique
|
|
outcaslib="&base_lib" casout="&base_dsn" append ;
|
|
quit;
|
|
*/
|
|
%end;
|
|
%else %if &engine_type=REDSHIFT or &engine_type=POSTGRES %then %do;
|
|
proc append base=&base_lib..&base_dsn
|
|
%if &engine_type=REDSHIFT %then %do;
|
|
(
|
|
%do idx_pk=1 %to &redcnt;
|
|
&&rednm&idx_pk = &&redval&idxpk
|
|
%end;
|
|
)
|
|
%end;
|
|
data=bitemp6_unique force nowarn;
|
|
run;
|
|
%end;
|
|
%else %do;
|
|
proc append base=&base_lib..&base_dsn data=bitemp6_unique force nowarn; run;
|
|
%end;
|
|
|
|
%mp_lockanytable(UNLOCK,lib=&base_lib,ds=&base_dsn,ref=&ETLSOURCE,
|
|
ctl_ds=&dclib..mpe_lockanytable
|
|
)
|
|
|
|
/* final check on syscc */
|
|
%mp_abort(iftrue= (&syscc >4)
|
|
,mac=&_program
|
|
,msg=%str(!!Upload NOT successful!! Failed on actual update / append stage..)
|
|
)
|
|
|
|
%if &outds_audit ne 0 and &LOADTARGET=YES %then %do;
|
|
data work.vw_outds_orig /view=work.vw_outds_orig;
|
|
set work.bitemp0_base (drop=&md5_col);
|
|
where ___TMP___NEW_FLG=0;
|
|
drop ___TMP___NEW_FLG;
|
|
run;
|
|
/* update the AUDIT table */
|
|
%if %mf_existds(&outds_audit) %then %do;
|
|
options mprint;
|
|
%mp_storediffs(&base_lib..&base_dsn
|
|
,work.vw_outds_orig
|
|
,&pk &bus_from
|
|
,delds=&outds_del
|
|
,modds=&outds_mod
|
|
,appds=&outds_add
|
|
,outds=work.mp_storediffs
|
|
,processed_dttm=&now
|
|
,loadref=%superq(etlsource)
|
|
)
|
|
/* exclude unchanged values in modified rows */
|
|
data work.mp_storediffs;
|
|
set work.mp_storediffs;
|
|
if MOVE_TYPE="M" and IS_PK=0 and IS_DIFF=0 then delete;
|
|
* putlog load_ref= libref= dsn= key_hash= tgtvar_nm=;
|
|
run;
|
|
proc append base=&outds_audit data=work.mp_storediffs;
|
|
run;
|
|
%mp_lockanytable(UNLOCK
|
|
,lib=%scan(&outds_audit,1,.)
|
|
,ds=%scan(&outds_audit,2,.)
|
|
,ref=&ETLSOURCE
|
|
,ctl_ds=&dclib..mpe_lockanytable
|
|
)
|
|
%end;
|
|
%end;
|
|
%mp_abort(iftrue= (&syscc >4)
|
|
,mac=bitemporal_dataloader
|
|
,msg=%str(Problem in audit stage (&outds_audit))
|
|
)
|
|
|
|
%let user=%mf_getUser();
|
|
/**
|
|
Notify as appropriate EMAILS DISABLED
|
|
|
|
%sumo_alerts(ALERT_EVENT=UPDATE
|
|
, ALERT_TARGET=&base_lib..&base_dsn
|
|
, from_user= &user);
|
|
*/
|
|
/* monitor BiTemporal usage */
|
|
%if &log=1 %then %do;
|
|
%put syscc=&syscc;
|
|
/* do not perform duration calc in pass through */
|
|
%local dur;
|
|
data _null_;
|
|
now=symget('now');
|
|
dur=%sysfunc(datetime())-&now;
|
|
call symputx('dur',dur,'l');
|
|
run;
|
|
proc sql;
|
|
insert into &dclib..mpe_dataloads
|
|
set libref=%upcase("&base_lib")
|
|
,DSN=%upcase("&base_dsn")
|
|
,ETLSOURCE="&ETLSOURCE"
|
|
,LOADTYPE="&loadtype"
|
|
,CHANGED_RECORDS=%mf_getattrn(&lastds,NLOBS)
|
|
,NEW_RECORDS=%mf_getattrn(&outds_add,NLOBS)
|
|
,DELETED_RECORDS=%mf_getattrn(&outds_del,NLOBS)
|
|
,DURATION=&dur
|
|
,MAC_VER="v&ver"
|
|
,user_nm="&user"
|
|
,PROCESSED_DTTM=&now;
|
|
quit;
|
|
%put syscc=&syscc;
|
|
%end;
|
|
%end_of_macro:
|
|
%mend bitemporal_dataloader;
|