@@ -136,37 +136,48 @@ func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, t
136
136
}
137
137
138
138
func (tm * TableMappingManager ) MergeBaseDBReplace (baseMap map [UpstreamID ]* DBReplace ) {
139
- // merge baseMap to DBReplaceMap
140
- // also updating globalIdMap, not going to be used in prod but convenient in test
139
+ // first pass: update all global IDs
141
140
for upstreamID , baseDBReplace := range baseMap {
142
141
tm .globalIdMap [upstreamID ] = baseDBReplace .DbID
143
142
144
- if existingDBReplace , exists := tm . DBReplaceMap [ upstreamID ]; exists {
145
- existingDBReplace . DbID = baseDBReplace . DbID
143
+ for tableUpID , baseTableReplace := range baseDBReplace . TableMap {
144
+ tm . globalIdMap [ tableUpID ] = baseTableReplace . TableID
146
145
147
- for tableUpID , baseTableReplace := range baseDBReplace .TableMap {
148
- tm .globalIdMap [tableUpID ] = baseTableReplace .TableID
146
+ for partUpID , basePartDownID := range baseTableReplace .PartitionMap {
147
+ tm .globalIdMap [partUpID ] = basePartDownID
148
+ }
149
+ }
150
+ }
149
151
150
- if existingTableReplace , tableExists := existingDBReplace .TableMap [tableUpID ]; tableExists {
151
- existingTableReplace .TableID = baseTableReplace .TableID
152
+ // second pass: update the DBReplaceMap
153
+ // first update all existing entries using the global ID map
154
+ for _ , existingDBReplace := range tm .DBReplaceMap {
155
+ if newID , exists := tm .globalIdMap [existingDBReplace .DbID ]; exists {
156
+ existingDBReplace .DbID = newID
157
+ }
152
158
153
- for partUpID , basePartDownID := range baseTableReplace .PartitionMap {
154
- tm .globalIdMap [partUpID ] = basePartDownID
155
- existingTableReplace .PartitionMap [partUpID ] = basePartDownID
156
- }
157
- } else {
158
- existingDBReplace .TableMap [tableUpID ] = baseTableReplace
159
- for partUpID , basePartDownID := range baseTableReplace .PartitionMap {
160
- tm .globalIdMap [partUpID ] = basePartDownID
161
- }
159
+ for _ , existingTableReplace := range existingDBReplace .TableMap {
160
+ if newID , exists := tm .globalIdMap [existingTableReplace .TableID ]; exists {
161
+ existingTableReplace .TableID = newID
162
+ }
163
+
164
+ for partUpID , partDownID := range existingTableReplace .PartitionMap {
165
+ if newID , exists := tm .globalIdMap [partDownID ]; exists {
166
+ existingTableReplace .PartitionMap [partUpID ] = newID
162
167
}
163
168
}
164
- } else {
169
+ }
170
+ }
171
+
172
+ // then add any new entries from the base map
173
+ for upstreamID , baseDBReplace := range baseMap {
174
+ if _ , exists := tm .DBReplaceMap [upstreamID ]; ! exists {
165
175
tm .DBReplaceMap [upstreamID ] = baseDBReplace
176
+ } else {
177
+ existingDBReplace := tm .DBReplaceMap [upstreamID ]
166
178
for tableUpID , baseTableReplace := range baseDBReplace .TableMap {
167
- tm .globalIdMap [tableUpID ] = baseTableReplace .TableID
168
- for partUpID , basePartDownID := range baseTableReplace .PartitionMap {
169
- tm .globalIdMap [partUpID ] = basePartDownID
179
+ if _ , exists := existingDBReplace .TableMap [tableUpID ]; ! exists {
180
+ existingDBReplace .TableMap [tableUpID ] = baseTableReplace
170
181
}
171
182
}
172
183
}
@@ -274,18 +285,18 @@ func (tm *TableMappingManager) ReplaceTemporaryIDs(
274
285
return nil
275
286
}
276
287
277
- func (tm * TableMappingManager ) FilterDBReplaceMap (filter * utils.PiTRTableTracker ) {
288
+ func (tm * TableMappingManager ) FilterDBReplaceMap (tracker * utils.PiTRTableTracker ) {
278
289
// iterate through existing DBReplaceMap
279
290
for dbID , dbReplace := range tm .DBReplaceMap {
280
291
// remove entire database if not in filter
281
- if ! filter .ContainsDB (dbID ) {
292
+ if ! tracker .ContainsDB (dbID ) {
282
293
delete (tm .DBReplaceMap , dbID )
283
294
continue
284
295
}
285
296
286
297
// filter tables in this database
287
298
for tableID := range dbReplace .TableMap {
288
- if ! filter .ContainsTable (dbID , tableID ) {
299
+ if ! tracker .ContainsTable (dbID , tableID ) {
289
300
delete (dbReplace .TableMap , tableID )
290
301
}
291
302
}
0 commit comments