46cdeb0bab
adding more info to the staged directory in relation to deletes. Also refactoring the demo data.
248 lines
7.5 KiB
SAS
Executable File
248 lines
7.5 KiB
SAS
Executable File
/**
|
|
@file
|
|
@brief Closes out records
|
|
@details Closes out records from a temporal table by reference to a single
|
|
temporal range + business key. Only live records are closed out, so the
|
|
entire key should be provided in the input table EXCEPT the TECH_FROM.
|
|
All records matching the key (as per the input table) are closed out
|
|
on TECH_TO.
|
|
|
|
Returns an updated base table and `&mpelib..mpe_dataloads` table
|
|
|
|
Potential improvements - write the update statements as a text file and retain
|
|
for future reference!
|
|
|
|
@param [in] now= (DEFINE) Allows consistent tracking of tech dates. Should be
|
|
a date literal, not a numeric constant, for DB compatibility.
|
|
@param [in] load_type= Set to UPDATE if non-temporal, else assumed
|
|
to be TXTEMPORAL. Note that BITEMPORAL is treated the same as TXTEMPORAL
|
|
given that BUS_FROM should be supplied in the PK.
|
|
@param [in] tech_from= (tx_from_dttm) Technical FROM datetime variable.
|
|
Required on BASE table only.
|
|
@param [in] AUDITFOLDER= (0) Unquoted path to a directory into which a copy of
|
|
the generated delete program will be written
|
|
|
|
<h4> Global Variables </h4>
|
|
@li `dc_dttmtfmt`
|
|
|
|
|
|
<h4> SAS Macros </h4>
|
|
@li mp_abort.sas
|
|
@li mf_existvar.sas
|
|
@li mf_getattrn.sas
|
|
@li mf_getengine.sas
|
|
@li mf_getuniquelibref.sas
|
|
@li mf_getuniquename.sas
|
|
@li mf_getuser.sas
|
|
@li mf_getvartype.sas
|
|
@li mp_lockanytable.sas
|
|
@li dc_assignlib.sas
|
|
|
|
|
|
@version 9.2
|
|
@author 4GL Apps Ltd
|
|
@copyright 4GL Apps Ltd. This code may only be used within Data Controller
|
|
and may not be re-distributed or re-sold without the express permission of
|
|
4GL Apps Ltd.
|
|
**/
|
|
|
|
%macro bitemporal_closeouts(
|
|
tech_from=tx_from_dttm
|
|
,tech_to = tx_to_dttm /* Technical TO datetime variable.
|
|
Req'd on BASE table only. */
|
|
,base_lib=WORK /* Libref of the BASE table. */
|
|
,base_dsn=BASETABLE /* Name of BASE table. */
|
|
,append_lib=WORK /* Libref of the STAGING table. */
|
|
,append_dsn=APPENDTABLE /* Name of STAGING table. */
|
|
,PK= name sex /* Business key, space separated. */
|
|
/* Should INCLUDE BUS_FROM field if relevant. */
|
|
,NOW=DEFINE
|
|
,FILTER= /* supply a filter to limit the update */
|
|
,AUDITFOLDER=0
|
|
,loadtype=
|
|
,loadtarget=YES /* if <> YES will return without changing anything */
|
|
);
|
|
%put ENTERING &sysmacroname;
|
|
%local x var start;
|
|
%let start=%sysfunc(datetime());
|
|
%dc_assignlib(WRITE,&base_lib)
|
|
%dc_assignlib(WRITE,&append_lib)
|
|
|
|
%if &now=DEFINE %then %let now=&dc_dttmtfmt.;
|
|
%put &=now;
|
|
/**
|
|
* perform basic checks
|
|
*/
|
|
/* do tables exist? */
|
|
%mp_abort(
|
|
iftrue=(%sysfunc(exist(&base_lib..&base_dsn)) ne 1),
|
|
msg=&base_lib..&base_dsn does not exist
|
|
)
|
|
%mp_abort(
|
|
iftrue=(%sysfunc(exist(&append_lib..&append_dsn))=0
|
|
and %sysfunc(exist(&append_lib..&append_dsn,VIEW))=0 ),
|
|
msg=&append_lib..&append_dsn does not exist
|
|
)
|
|
|
|
/* do TX columns exist? */
|
|
%if &loadtype ne UPDATE %then %do;
|
|
%if not %mf_existvar(&base_lib..&base_dsn,&tech_from) %then %do;
|
|
%mp_abort(msg=&tech_from does not exist on &base_lib..&base_dsn)
|
|
%end;
|
|
%else %if not %mf_existvar(&base_lib..&base_dsn,&tech_to) %then %do;
|
|
%mp_abort(msg=&tech_to does not exist on &base_lib..&base_dsn)
|
|
%end;
|
|
%end;
|
|
/* do PK columns exist? */
|
|
%do x=1 %to %sysfunc(countw(&PK));
|
|
%let var=%scan(&pk,&x,%str( ));
|
|
%if not %mf_existvar(&base_lib..&base_dsn,&var) %then %do;
|
|
%mp_abort(msg=&var does not exist on &base_lib..&base_dsn)
|
|
%end;
|
|
%else %if not %mf_existvar(&append_lib..&append_dsn,&var) %then %do;
|
|
%mp_abort(msg=&var does not exist on &append_lib..&append_dsn)
|
|
%end;
|
|
%end;
|
|
/* check uniqueness */
|
|
proc sort data=&append_lib..&append_dsn
|
|
out=___closeout1 noduprecs dupout=___closeout1a;
|
|
by &pk;
|
|
run;
|
|
%if %mf_getattrn(___closeout1a,NLOBS)>0 %then
|
|
%put NOTE: dups on (&PK) in (&append_lib..&append_dsn);
|
|
/* is &NOW value within a tolerance? Should not allow renegade closeouts.. */
|
|
%local gap;
|
|
%let gap=0;
|
|
data _null_;
|
|
now=&now;
|
|
gap=intck('HOURS',now,datetime());
|
|
call symputx('gap',gap,'l');
|
|
run;
|
|
%mp_abort(
|
|
iftrue=(&gap > 24),
|
|
msg=NOW variable (&now) is not within a 24hr tolerance
|
|
)
|
|
|
|
/* have any warnings / errs occurred thus far? If so, abort */
|
|
%mp_abort(
|
|
iftrue=(&syscc>0),
|
|
msg=Aborted due to SYSCC=&SYSCC status
|
|
)
|
|
|
|
/* set up folder */
|
|
%local tmplib;%let tmplib=%mf_getuniquelibref();
|
|
%if "&AUDITFOLDER"="0" %then %do;
|
|
filename tmp temp lrecl=10000;
|
|
libname &tmplib (work);
|
|
%end;
|
|
%else %do;
|
|
filename tmp "&AUDITFOLDER/deleterecords.sas" lrecl=10000;
|
|
libname &tmplib "&AUDITFOLDER";
|
|
%end;
|
|
|
|
/**
|
|
* Create closeout statements. If UPDATE approach and CAS engine, use the
|
|
* DeleteRows action (as regular SQL deletes are not supported).
|
|
* Otherwise, the deletions are sent as individual SQL statements
|
|
* to ensure pass-through utilisation. The update_cnt variable monitors
|
|
* how many records were actually updated on the target table.
|
|
*/
|
|
%local update_cnt etype;
|
|
%let update_cnt=0;
|
|
%let etype=%mf_getengine(&base_lib);
|
|
%put &=etype;
|
|
|
|
%if &loadtype=UPDATE and &etype=CAS %then %do;
|
|
/* create temp table for deletions */
|
|
%local delds;%let delds=%mf_getuniquename(prefix=DC);
|
|
data casuser.&delds &tmplib..deleterecords;
|
|
set work.___closeout1;
|
|
keep &pk;
|
|
run;
|
|
/* build the proc */
|
|
data _null_;
|
|
file tmp;
|
|
put "/* libname approve '&AUDITFOLDER'; */";
|
|
put 'proc cas;table.deleteRows result=r/ table={' ;
|
|
put " caslib='&base_lib',name='&base_dsn',where='1=1',";
|
|
put " whereTable={caslib='CASUSER',name='&delds'}";
|
|
put "};";
|
|
put "call symputx('update_cnt',r.RowsDeleted);";
|
|
put "quit;";
|
|
put "data;set casuser.&delds;putlog (_all_)(=);run;";
|
|
put '%put &=update_cnt;';
|
|
put "proc sql;drop table CASUSER.&delds;";
|
|
stop;
|
|
run;
|
|
|
|
%end;
|
|
%else %do;
|
|
data _null_;
|
|
set ___closeout1;
|
|
file tmp;
|
|
if _n_=1 then put 'proc sql noprint;' ;
|
|
length string $32767.;
|
|
%if &loadtype=UPDATE %then %do;
|
|
put "delete from &base_lib..&base_dsn where 1";
|
|
%end;
|
|
%else %do;
|
|
now=symget('now');
|
|
put "update &base_lib..&base_dsn set &tech_to= " now @;
|
|
%if %mf_existvar(&base_lib..&base_dsn,PROCESSED_DTTM) %then %do;
|
|
put " ,PROCESSED_DTTM=" now @;
|
|
%end;
|
|
put " where " now " lt &tech_to ";
|
|
%end;
|
|
%do x=1 %to %sysfunc(countw(&PK));
|
|
%let var=%scan(&pk,&x,%str( ));
|
|
%if %mf_getvartype(&base_lib..&base_dsn,&var)=C %then %do;
|
|
/* use single quotes to avoid ampersand resolution in data */
|
|
string=" & &var='"!!trim(prxchange("s/'/''/",-1,&var))!!"'";
|
|
%end;
|
|
%else %do;
|
|
string=cats(" & &var=",&var);
|
|
%end;
|
|
put string;
|
|
%end;
|
|
put "&filter ;";
|
|
put '%let update_cnt=%eval(&update_cnt+&sqlobs);';
|
|
put '%put update_cnt=&update_cnt;';
|
|
run;
|
|
%end;
|
|
|
|
%if &loadtarget ne YES %then %return;
|
|
|
|
/* ensure we have a lock */
|
|
%mp_lockanytable(LOCK,
|
|
lib=&base_lib,ds=&base_dsn
|
|
,ref=bitemporal_closeouts
|
|
,ctl_ds=&mpelib..mpe_lockanytable
|
|
)
|
|
|
|
options source2;
|
|
%inc tmp;
|
|
|
|
filename tmp clear;
|
|
|
|
/**
|
|
* Update audit tracker
|
|
*/
|
|
|
|
%local newobs; %let newobs=%mf_getattrn(work.___closeout1,NLOBS);
|
|
%local user; %let user=%mf_getuser();
|
|
proc sql;
|
|
insert into &mpelib..mpe_dataloads
|
|
set libref=%upcase("&base_lib")
|
|
,DSN=%upcase("&base_dsn")
|
|
,ETLSOURCE="&append_lib..&append_dsn contained &newobs records"
|
|
,LOADTYPE="CLOSEOUT"
|
|
,DELETED_RECORDS=&update_cnt
|
|
,NEW_RECORDS=0
|
|
,DURATION=%sysfunc(datetime())-&start
|
|
,USER_NM="&user"
|
|
,PROCESSED_DTTM=&now;
|
|
quit;
|
|
|
|
|
|
%mend bitemporal_closeouts;
|